This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 09f7287cda5 [Opt](cloud) Notify BE to make committed rowset visible 
directly after txn commit without fetching from meta service (#59754)
09f7287cda5 is described below

commit 09f7287cda507b1e7809036244859d38d6a54244
Author: bobhan1 <[email protected]>
AuthorDate: Wed Mar 18 10:54:56 2026 +0800

    [Opt](cloud) Notify BE to make committed rowset visible directly after txn 
commit without fetching from meta service (#59754)
    
    ### What problem does this PR solve?
    
    ## Test
    
    1FE+3BE, 400bucket mow table, 10 concurrency stream load
    
    before:
    <img width="3218" height="1300" alt="image"
    
src="https://github.com/user-attachments/assets/3d4fe51d-4fae-484b-a257-a6b5ec010970";
    />
    after:
    <img width="3206" height="1304" alt="image"
    
src="https://github.com/user-attachments/assets/e7230b3c-999d-407b-aa57-783492995e5b";
    />
    
    ---
    
    
      Problem Summary
    
    In cloud storage mode, after a load transaction commits rowsets to Meta
    Service (MS), BE nodes need to fetch the committed rowset metadata from
    MS during subsequent sync_rowsets() operations. This introduces
    additional latency and MS query overhead, especially for high-frequency
    import scenarios.
    
    This PR implements a notification mechanism where FE/BE directly
    notifies BE nodes about committed rowsets after transaction commit,
    allowing BE to update tablet metadata immediately without fetching from
    MS.
    
      Solution
    
    1. CloudCommittedRSMgr (BE): A new in-memory manager that caches
    committed temporary rowset metadata after they are committed to MS
        - Stores rowset meta with expiration time
        - Supports efficient lookup by (txn_id, tablet_id)
        - Handles empty rowset markers
        - Background thread for cleaning expired entries
      2. Notification Flow:
    - When FE commits a load transaction to MS, it sends
    MakeCloudTmpRsVisibleTask to involved BE nodes
        - BE receives the task with final version/visible_ts information
        - BE promotes cached rowset metadata to tablet meta directly
    - BE can forward notification to FE if needed (for tablets not on the
    original BE)
      3. Configuration:
        - FE: enable_cloud_notify_be_after_load_txn_commit (default: false)
        - BE: enable_cloud_make_rs_visible_on_be (default: false)
        - BE: cloud_make_committed_rs_visible_worker_count (default: 4)
        - BE: cloud_mow_sync_rowsets_when_load_txn_begin (default: true)
    
      Benefits
    
    - Reduces Meta Service query pressure by avoiding redundant rowset
    metadata fetches
      - Improves import latency by making rowsets visible faster
      - Optimizes cloud storage mode performance for high-frequency imports
---
 be/src/agent/agent_server.cpp                      |   6 +
 be/src/agent/task_worker_pool.cpp                  |  52 +++
 be/src/agent/task_worker_pool.h                    |   3 +
 be/src/cloud/cloud_committed_rs_mgr.cpp            | 142 +++++++
 be/src/cloud/cloud_committed_rs_mgr.h              |  87 +++++
 be/src/cloud/cloud_delete_task.cpp                 |  10 +-
 be/src/cloud/cloud_delta_writer.cpp                |   4 +-
 be/src/cloud/cloud_delta_writer.h                  |   2 +-
 be/src/cloud/cloud_meta_mgr.cpp                    |  23 +-
 be/src/cloud/cloud_meta_mgr.h                      |   4 +-
 be/src/cloud/cloud_rowset_builder.cpp              |  21 +-
 be/src/cloud/cloud_rowset_builder.h                |   2 +-
 be/src/cloud/cloud_storage_engine.cpp              |   3 +
 be/src/cloud/cloud_storage_engine.h                |   4 +
 be/src/cloud/cloud_tablet.cpp                      | 169 +++++++++
 be/src/cloud/cloud_tablet.h                        |  29 ++
 be/src/cloud/cloud_tablet_mgr.cpp                  |   4 +
 be/src/cloud/cloud_tablets_channel.cpp             |   4 +-
 be/src/cloud/cloud_txn_delete_bitmap_cache.cpp     |  41 +++
 be/src/cloud/cloud_txn_delete_bitmap_cache.h       |   4 +
 be/src/common/config.cpp                           |   6 +
 be/src/common/config.h                             |   6 +
 be/src/storage/rowset/rowset_meta.h                |  12 +-
 be/test/cloud/cloud_committed_rs_mgr_test.cpp      | 408 +++++++++++++++++++++
 be/test/cloud/cloud_tablet_test.cpp                | 365 ++++++++++++++++++
 .../main/java/org/apache/doris/common/Config.java  |   3 +
 .../transaction/CloudGlobalTransactionMgr.java     | 138 ++++++-
 .../apache/doris/service/FrontendServiceImpl.java  |   2 +-
 .../java/org/apache/doris/task/AgentBatchTask.java |  10 +
 .../doris/task/MakeCloudTmpRsVisibleTask.java      |  73 ++++
 .../doris/transaction/GlobalTransactionMgr.java    |   2 +-
 .../transaction/GlobalTransactionMgrIface.java     |   2 +-
 gensrc/thrift/AgentService.thrift                  |   9 +
 gensrc/thrift/Types.thrift                         |   3 +-
 ...loud_dup_forward_notify_be_after_txn_commit.out |  20 +
 .../test_cloud_dup_notify_be_after_txn_commit.out  |  36 ++
 ...t_cloud_empty_rs_notify_be_after_txn_commit.out |  38 ++
 .../test_cloud_mow_notify_be_after_txn_commit.out  |  14 +
 .../pipeline/cloud_p0/conf/be_custom.conf          |   3 +
 .../pipeline/cloud_p0/conf/fe_custom.conf          |   4 +-
 .../pipeline/cloud_p1/conf/be_custom.conf          |   3 +
 .../pipeline/cloud_p1/conf/fe_custom.conf          |   2 +
 ...d_dup_forward_notify_be_after_txn_commit.groovy | 131 +++++++
 ...est_cloud_dup_notify_be_after_txn_commit.groovy | 161 ++++++++
 ...loud_empty_rs_notify_be_after_txn_commit.groovy | 191 ++++++++++
 ...est_cloud_mow_notify_be_after_txn_commit.groovy |  84 +++++
 46 files changed, 2307 insertions(+), 33 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index a60686637fd..b13af97e717 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -240,6 +240,12 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& 
engine, ExecEnv* exec_
             "ALTER_INVERTED_INDEX", config::alter_index_worker_count,
             [&engine](auto&& task) { return alter_cloud_index_callback(engine, 
task); });
 
+    _workers[TTaskType::MAKE_CLOUD_COMMITTED_RS_VISIBLE] = 
std::make_unique<TaskWorkerPool>(
+            "MAKE_CLOUD_COMMITTED_RS_VISIBLE", 
config::cloud_make_committed_rs_visible_worker_count,
+            [&engine](auto&& task) {
+                return make_cloud_committed_rs_visible_callback(engine, task);
+            });
+
     _report_workers.push_back(std::make_unique<ReportWorker>(
             "REPORT_TASK", _cluster_info, config::report_task_interval_seconds,
             [&cluster_info = _cluster_info] { 
report_task_callback(cluster_info); }));
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 415c8b6742a..2753a527a4b 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -51,6 +51,7 @@
 #include "cloud/cloud_schema_change_job.h"
 #include "cloud/cloud_snapshot_loader.h"
 #include "cloud/cloud_snapshot_mgr.h"
+#include "cloud/cloud_tablet.h"
 #include "cloud/cloud_tablet_mgr.h"
 #include "cloud/config.h"
 #include "common/config.h"
@@ -2393,6 +2394,57 @@ void calc_delete_bitmap_callback(CloudStorageEngine& 
engine, const TAgentTaskReq
     remove_task_info(req.task_type, req.signature);
 }
 
+void make_cloud_committed_rs_visible_callback(CloudStorageEngine& engine,
+                                              const TAgentTaskRequest& req) {
+    if (!config::enable_cloud_make_rs_visible_on_be) {
+        return;
+    }
+    LOG(INFO) << "begin to make cloud tmp rs visible, txn_id="
+              << req.make_cloud_tmp_rs_visible_req.txn_id
+              << ", tablet_count=" << 
req.make_cloud_tmp_rs_visible_req.tablet_ids.size();
+
+    const auto& make_visible_req = req.make_cloud_tmp_rs_visible_req;
+    auto& tablet_mgr = engine.tablet_mgr();
+
+    int64_t txn_id = make_visible_req.txn_id;
+    int64_t version_update_time_ms = 
make_visible_req.__isset.version_update_time_ms
+                                             ? 
make_visible_req.version_update_time_ms
+                                             : 0;
+
+    // Process each tablet involved in this transaction on this BE
+    for (int64_t tablet_id : make_visible_req.tablet_ids) {
+        auto tablet_result =
+                tablet_mgr.get_tablet(tablet_id, /* warmup_data */ false,
+                                      /* sync_delete_bitmap */ false,
+                                      /* sync_stats */ nullptr, /* 
force_use_only_cached */ true,
+                                      /* cache_on_miss */ false);
+        if (!tablet_result.has_value()) {
+            continue;
+        }
+        auto cloud_tablet = tablet_result.value();
+
+        int64_t partition_id = cloud_tablet->partition_id();
+        auto version_iter = 
make_visible_req.partition_version_map.find(partition_id);
+        if (version_iter == make_visible_req.partition_version_map.end()) {
+            continue;
+        }
+        int64_t visible_version = version_iter->second;
+        DBUG_EXECUTE_IF("make_cloud_committed_rs_visible_callback.block", {
+            auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+            auto target_table_id = dp->param<int64_t>("table_id", -1);
+            auto version = dp->param<int64_t>("version", -1);
+            if ((target_tablet_id == tablet_id || target_table_id == 
cloud_tablet->table_id()) &&
+                version == visible_version) {
+                DBUG_BLOCK
+            }
+        });
+        cloud_tablet->try_make_committed_rs_visible(txn_id, visible_version,
+                                                    version_update_time_ms);
+    }
+    LOG(INFO) << "make cloud tmp rs visible finished, txn_id=" << txn_id
+              << ", processed_tablets=" << make_visible_req.tablet_ids.size();
+}
+
 void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req) 
{
     LOG(INFO) << "clean trash start";
     DBUG_EXECUTE_IF("clean_trash_callback_sleep", { sleep(100); })
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 300e1daa606..e5782055267 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -205,6 +205,9 @@ void report_tablet_callback(CloudStorageEngine& engine, 
const ClusterInfo* clust
 
 void calc_delete_bitmap_callback(CloudStorageEngine& engine, const 
TAgentTaskRequest& req);
 
+void make_cloud_committed_rs_visible_callback(CloudStorageEngine& engine,
+                                              const TAgentTaskRequest& req);
+
 void report_index_policy_callback(const ClusterInfo* cluster_info);
 
 } // namespace doris
diff --git a/be/src/cloud/cloud_committed_rs_mgr.cpp 
b/be/src/cloud/cloud_committed_rs_mgr.cpp
new file mode 100644
index 00000000000..3d96b7ca7f7
--- /dev/null
+++ b/be/src/cloud/cloud_committed_rs_mgr.cpp
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_committed_rs_mgr.h"
+
+#include <chrono>
+
+#include "cloud/config.h"
+#include "common/logging.h"
+#include "storage/rowset/rowset_meta.h"
+#include "util/thread.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+CloudCommittedRSMgr::CloudCommittedRSMgr() : _stop_latch(1) {}
+
+CloudCommittedRSMgr::~CloudCommittedRSMgr() {
+    _stop_latch.count_down();
+    if (_clean_thread) {
+        _clean_thread->join();
+    }
+}
+
+Status CloudCommittedRSMgr::init() {
+    auto st = Thread::create(
+            "CloudCommittedRSMgr", "clean_committed_rs_thread",
+            [this]() { this->_clean_thread_callback(); }, &_clean_thread);
+    if (!st.ok()) {
+        LOG(WARNING) << "failed to create thread for CloudCommittedRSMgr, 
error: " << st;
+    }
+    return st;
+}
+
+void CloudCommittedRSMgr::add_committed_rowset(int64_t txn_id, int64_t 
tablet_id,
+                                               RowsetMetaSharedPtr rowset_meta,
+                                               int64_t expiration_time) {
+    int64_t txn_expiration_min =
+            
duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
+                    .count() +
+            config::tablet_txn_info_min_expired_seconds;
+    expiration_time = std::max(txn_expiration_min, expiration_time);
+    std::unique_lock<std::shared_mutex> wlock(_rwlock);
+    TxnTabletKey key(txn_id, tablet_id);
+    _committed_rs_map.insert_or_assign(key, CommittedRowsetValue(rowset_meta, 
expiration_time));
+    _expiration_map.emplace(expiration_time, key);
+    LOG(INFO) << "add pending rowset, txn_id=" << txn_id << ", tablet_id=" << 
tablet_id
+              << ", rowset_id=" << rowset_meta->rowset_id().to_string()
+              << ", expiration_time=" << expiration_time;
+}
+
+Result<std::pair<RowsetMetaSharedPtr, int64_t>> 
CloudCommittedRSMgr::get_committed_rowset(
+        int64_t txn_id, int64_t tablet_id) {
+    std::shared_lock<std::shared_mutex> rlock(_rwlock);
+    TxnTabletKey key(txn_id, tablet_id);
+    if (auto it = _empty_rowset_markers.find(key); it != 
_empty_rowset_markers.end()) {
+        return std::make_pair(nullptr, it->second);
+    }
+    auto iter = _committed_rs_map.find(key);
+    if (iter == _committed_rs_map.end()) {
+        return ResultError(Status::Error<ErrorCode::NOT_FOUND>(
+                "committed rowset not found, txn_id={}, tablet_id={}", txn_id, 
tablet_id));
+    }
+    return std::make_pair(iter->second.rowset_meta, 
iter->second.expiration_time);
+}
+
+void CloudCommittedRSMgr::remove_committed_rowset(int64_t txn_id, int64_t 
tablet_id) {
+    std::unique_lock<std::shared_mutex> wlock(_rwlock);
+    _committed_rs_map.erase({txn_id, tablet_id});
+}
+
+void CloudCommittedRSMgr::remove_expired_committed_rowsets() {
+    std::unique_lock<std::shared_mutex> wlock(_rwlock);
+    int64_t current_time = std::chrono::duration_cast<std::chrono::seconds>(
+                                   
std::chrono::system_clock::now().time_since_epoch())
+                                   .count();
+
+    while (!_expiration_map.empty()) {
+        auto iter = _expiration_map.begin();
+        if (!_committed_rs_map.contains(iter->second) &&
+            !_empty_rowset_markers.contains(iter->second)) {
+            _expiration_map.erase(iter);
+            continue;
+        }
+        int64_t expiration_time = iter->first;
+        if (expiration_time > current_time) {
+            break;
+        }
+
+        auto key = iter->second;
+        _expiration_map.erase(iter);
+
+        auto it_rs = _committed_rs_map.find(key);
+        if (it_rs != _committed_rs_map.end() && it_rs->second.expiration_time 
== expiration_time) {
+            _committed_rs_map.erase(it_rs);
+            LOG(INFO) << "clean expired pending cloud rowset, txn_id=" << 
key.txn_id
+                      << ", tablet_id=" << key.tablet_id << ", 
expiration_time=" << expiration_time;
+        }
+        auto it_empty = _empty_rowset_markers.find(key);
+        if (it_empty != _empty_rowset_markers.end() && it_empty->second == 
expiration_time) {
+            _empty_rowset_markers.erase(it_empty);
+            LOG(INFO) << "clean expired empty rowset marker, txn_id=" << 
key.txn_id
+                      << ", tablet_id=" << key.tablet_id << ", 
expiration_time=" << expiration_time;
+        }
+    }
+}
+
+void CloudCommittedRSMgr::mark_empty_rowset(int64_t txn_id, int64_t tablet_id,
+                                            int64_t txn_expiration) {
+    int64_t txn_expiration_min =
+            
duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
+                    .count() +
+            config::tablet_txn_info_min_expired_seconds;
+    txn_expiration = std::max(txn_expiration_min, txn_expiration);
+
+    std::unique_lock<std::shared_mutex> wlock(_rwlock);
+    TxnTabletKey txn_key(txn_id, tablet_id);
+    _empty_rowset_markers.insert_or_assign(txn_key, txn_expiration);
+    _expiration_map.emplace(txn_expiration, txn_key);
+}
+
+void CloudCommittedRSMgr::_clean_thread_callback() {
+    do {
+        remove_expired_committed_rowsets();
+    } while (!_stop_latch.wait_for(
+            
std::chrono::seconds(config::remove_expired_tablet_txn_info_interval_seconds)));
+}
+#include "common/compile_check_end.h"
+} // namespace doris
diff --git a/be/src/cloud/cloud_committed_rs_mgr.h 
b/be/src/cloud/cloud_committed_rs_mgr.h
new file mode 100644
index 00000000000..33af0ba3979
--- /dev/null
+++ b/be/src/cloud/cloud_committed_rs_mgr.h
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <shared_mutex>
+
+#include "common/status.h"
+#include "storage/rowset/rowset_fwd.h"
+#include "util/countdown_latch.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+class Thread;
+
+// Manages temporary rowset meta for cloud storage transactions in memory.
+// This cache stores rowset meta produced during import operations after they
+// are committed to MS. After the load txn was committed in MS finally, FE/BE 
will
+// notifies the final version/visible_ts, BE can update and promote these
+// temporary rowsets to the tablet meta without fetching from MS in later 
sync_rowsets().
+class CloudCommittedRSMgr {
+public:
+    CloudCommittedRSMgr();
+    ~CloudCommittedRSMgr();
+
+    Status init();
+
+    void add_committed_rowset(int64_t txn_id, int64_t tablet_id, 
RowsetMetaSharedPtr rowset_meta,
+                              int64_t expiration_time);
+
+    Result<std::pair<RowsetMetaSharedPtr, int64_t>> 
get_committed_rowset(int64_t txn_id,
+                                                                         
int64_t tablet_id);
+
+    void remove_committed_rowset(int64_t txn_id, int64_t tablet_id);
+
+    void remove_expired_committed_rowsets();
+
+    void mark_empty_rowset(int64_t txn_id, int64_t tablet_id, int64_t 
txn_expiration);
+
+private:
+    void _clean_thread_callback();
+
+    struct TxnTabletKey {
+        int64_t txn_id;
+        int64_t tablet_id;
+
+        TxnTabletKey(int64_t txn_id_, int64_t tablet_id_)
+                : txn_id(txn_id_), tablet_id(tablet_id_) {}
+
+        auto operator<=>(const TxnTabletKey&) const = default;
+    };
+
+    struct CommittedRowsetValue {
+        RowsetMetaSharedPtr rowset_meta;
+        int64_t expiration_time; // seconds since epoch
+
+        CommittedRowsetValue(RowsetMetaSharedPtr rowset_meta_, int64_t 
expiration_time_)
+                : rowset_meta(std::move(rowset_meta_)), 
expiration_time(expiration_time_) {}
+    };
+
+    // Map: <txn_id, tablet_id> -> <rowset_meta, expiration_time>
+    std::map<TxnTabletKey, CommittedRowsetValue> _committed_rs_map;
+    // Multimap for efficient expiration cleanup: expiration_time -> <txn_id, 
tablet_id>
+    std::multimap<int64_t, TxnTabletKey> _expiration_map;
+    std::map<TxnTabletKey, int64_t /* expiration_time */> 
_empty_rowset_markers;
+    std::shared_mutex _rwlock;
+    std::shared_ptr<Thread> _clean_thread;
+    CountDownLatch _stop_latch;
+};
+#include "common/compile_check_end.h"
+} // namespace doris
diff --git a/be/src/cloud/cloud_delete_task.cpp 
b/be/src/cloud/cloud_delete_task.cpp
index 0b51da4eafd..dc3d991df58 100644
--- a/be/src/cloud/cloud_delete_task.cpp
+++ b/be/src/cloud/cloud_delete_task.cpp
@@ -103,19 +103,27 @@ Status CloudDeleteTask::execute(CloudStorageEngine& 
engine, const TPushReq& requ
     }
 
     st = engine.meta_mgr().commit_rowset(*rowset->rowset_meta(), "");
+    if (!st.ok()) {
+        LOG(WARNING) << "failed to commit rowset, status=" << st.to_string();
+        return st;
+    }
 
     // Update tablet stats
     tablet->fetch_add_approximate_num_rowsets(1);
     tablet->fetch_add_approximate_cumu_num_rowsets(1);
 
     // TODO(liaoxin) delete operator don't send calculate delete bitmap task 
from fe,
-    //  then we don't need to set_txn_related_delete_bitmap here.
+    //  then we don't need to set_txn_related_info here.
     if (tablet->enable_unique_key_merge_on_write()) {
         DeleteBitmapPtr delete_bitmap = 
std::make_shared<DeleteBitmap>(tablet->tablet_id());
         RowsetIdUnorderedSet rowset_ids;
         engine.txn_delete_bitmap_cache().set_tablet_txn_info(
                 request.transaction_id, tablet->tablet_id(), delete_bitmap, 
rowset_ids, rowset,
                 request.timeout, nullptr);
+    } else {
+        if (config::enable_cloud_make_rs_visible_on_be) {
+            engine.meta_mgr().cache_committed_rowset(rowset->rowset_meta(), 
context.txn_expiration);
+        }
     }
 
     return st;
diff --git a/be/src/cloud/cloud_delta_writer.cpp 
b/be/src/cloud/cloud_delta_writer.cpp
index 299182b45cb..d51d5d8b576 100644
--- a/be/src/cloud/cloud_delta_writer.cpp
+++ b/be/src/cloud/cloud_delta_writer.cpp
@@ -141,8 +141,8 @@ Status CloudDeltaWriter::_commit_empty_rowset() {
     return _engine.meta_mgr().commit_rowset(*rowset_meta(), "");
 }
 
-Status CloudDeltaWriter::set_txn_related_delete_bitmap() {
-    return rowset_builder()->set_txn_related_delete_bitmap();
+Status CloudDeltaWriter::set_txn_related_info() {
+    return rowset_builder()->set_txn_related_info();
 }
 
 } // namespace doris
diff --git a/be/src/cloud/cloud_delta_writer.h 
b/be/src/cloud/cloud_delta_writer.h
index 846149137b1..614bfd0f16a 100644
--- a/be/src/cloud/cloud_delta_writer.h
+++ b/be/src/cloud/cloud_delta_writer.h
@@ -51,7 +51,7 @@ public:
 
     Status commit_rowset();
 
-    Status set_txn_related_delete_bitmap();
+    Status set_txn_related_info();
     std::shared_ptr<ResourceContext> resource_context() { return 
_resource_ctx; }
 
 private:
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index f51b58bb757..bfa58af43d4 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -565,6 +565,14 @@ Status 
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
     using namespace std::chrono;
 
     TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", 
Status::OK(), tablet);
+    DBUG_EXECUTE_IF("CloudMetaMgr::sync_tablet_rowsets.before.inject_error", {
+        auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+        auto target_table_id = dp->param<int64_t>("table_id", -1);
+        if (target_tablet_id == tablet->tablet_id() || target_table_id == 
tablet->table_id()) {
+            return Status::InternalError(
+                    "[sync_tablet_rowsets_unlocked] injected error for 
testing");
+        }
+    });
 
     MetaServiceProxy* proxy;
     RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
@@ -800,7 +808,7 @@ Status 
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
     }
 }
 
-bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, 
int64_t old_max_version,
+bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet,
                                                       std::ranges::range 
auto&& rs_metas,
                                                       DeleteBitmap* 
delete_bitmap) {
     std::set<int64_t> txn_processed;
@@ -958,7 +966,7 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* 
tablet, int64_t old_
     }
 
     if (!full_sync && config::enable_sync_tablet_delete_bitmap_by_cache &&
-        sync_tablet_delete_bitmap_by_cache(tablet, old_max_version, rs_metas, 
delete_bitmap)) {
+        sync_tablet_delete_bitmap_by_cache(tablet, rs_metas, delete_bitmap)) {
         if (sync_stats) {
             sync_stats->get_local_delete_bitmap_rowsets_num += rs_metas.size();
         }
@@ -1376,6 +1384,17 @@ Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta, 
const std::string& job_i
     return st;
 }
 
+void CloudMetaMgr::cache_committed_rowset(RowsetMetaSharedPtr rs_meta, int64_t 
expiration_time) {
+    // For load-generated rowsets (job_id is empty), add to pending rowset 
manager
+    // so FE can notify BE to promote them later
+
+    // TODO(bobhan1): copy rs_meta?
+    int64_t txn_id = rs_meta->txn_id();
+    int64_t tablet_id = rs_meta->tablet_id();
+    
ExecEnv::GetInstance()->storage_engine().to_cloud().committed_rs_mgr().add_committed_rowset(
+            txn_id, tablet_id, std::move(rs_meta), expiration_time);
+}
+
 Status CloudMetaMgr::update_tmp_rowset(const RowsetMeta& rs_meta) {
     VLOG_DEBUG << "update committed rowset, tablet_id: " << rs_meta.tablet_id()
                << ", rowset_id: " << rs_meta.rowset_id();
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index 2713ce12921..5bf54dde548 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -83,6 +83,7 @@ public:
 
     Status commit_rowset(RowsetMeta& rs_meta, const std::string& job_id,
                          std::shared_ptr<RowsetMeta>* existed_rs_meta = 
nullptr);
+    void cache_committed_rowset(RowsetMetaSharedPtr rs_meta, int64_t 
expiration_time);
 
     Status update_tmp_rowset(const RowsetMeta& rs_meta);
 
@@ -176,8 +177,7 @@ public:
                               std::string* my_cluster_id = nullptr);
 
 private:
-    bool sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t 
old_max_version,
-                                            std::ranges::range auto&& rs_metas,
+    bool sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, 
std::ranges::range auto&& rs_metas,
                                             DeleteBitmap* delete_bitmap);
 
     Status sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t 
old_max_version,
diff --git a/be/src/cloud/cloud_rowset_builder.cpp 
b/be/src/cloud/cloud_rowset_builder.cpp
index 0ce4829d9fe..1f73d11d20e 100644
--- a/be/src/cloud/cloud_rowset_builder.cpp
+++ b/be/src/cloud/cloud_rowset_builder.cpp
@@ -43,10 +43,12 @@ Status CloudRowsetBuilder::init() {
 
     std::shared_ptr<MowContext> mow_context;
     if (_tablet->enable_unique_key_merge_on_write()) {
-        auto st = 
std::static_pointer_cast<CloudTablet>(_tablet)->sync_rowsets();
-        // sync_rowsets will return INVALID_TABLET_STATE when tablet is under 
alter
-        if (!st.ok() && !st.is<ErrorCode::INVALID_TABLET_STATE>()) {
-            return st;
+        if (config::cloud_mow_sync_rowsets_when_load_txn_begin) {
+            auto st = 
std::static_pointer_cast<CloudTablet>(_tablet)->sync_rowsets();
+            // sync_rowsets will return INVALID_TABLET_STATE when tablet is 
under alter
+            if (!st.ok() && !st.is<ErrorCode::INVALID_TABLET_STATE>()) {
+                return st;
+            }
         }
         RETURN_IF_ERROR(init_mow_context(mow_context));
     }
@@ -130,7 +132,7 @@ const RowsetMetaSharedPtr& 
CloudRowsetBuilder::rowset_meta() {
     return _rowset_writer->rowset_meta();
 }
 
-Status CloudRowsetBuilder::set_txn_related_delete_bitmap() {
+Status CloudRowsetBuilder::set_txn_related_info() {
     if (_tablet->enable_unique_key_merge_on_write()) {
         // For empty rowsets when skip_writing_empty_rowset_metadata=true,
         // store only a lightweight marker instead of full rowset info.
@@ -156,6 +158,15 @@ Status CloudRowsetBuilder::set_txn_related_delete_bitmap() 
{
         _engine.txn_delete_bitmap_cache().set_tablet_txn_info(
                 _req.txn_id, _tablet->tablet_id(), _delete_bitmap, 
*_rowset_ids, _rowset,
                 _req.txn_expiration, _partial_update_info);
+    } else {
+        if (config::enable_cloud_make_rs_visible_on_be) {
+            if (_skip_writing_rowset_metadata) {
+                _engine.committed_rs_mgr().mark_empty_rowset(_req.txn_id, 
_tablet->tablet_id(),
+                                                             
_req.txn_expiration);
+            } else {
+                _engine.meta_mgr().cache_committed_rowset(rowset_meta(), 
_req.txn_expiration);
+            }
+        }
     }
     return Status::OK();
 }
diff --git a/be/src/cloud/cloud_rowset_builder.h 
b/be/src/cloud/cloud_rowset_builder.h
index 3384f235167..cec8cfed979 100644
--- a/be/src/cloud/cloud_rowset_builder.h
+++ b/be/src/cloud/cloud_rowset_builder.h
@@ -37,7 +37,7 @@ public:
 
     const RowsetMetaSharedPtr& rowset_meta();
 
-    Status set_txn_related_delete_bitmap();
+    Status set_txn_related_info();
 
     void set_skip_writing_rowset_metadata(bool skip) { 
_skip_writing_rowset_metadata = skip; }
 
diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index 432c1fde72f..927d5bef343 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -223,6 +223,9 @@ Status CloudStorageEngine::open() {
                     : config::delete_bitmap_agg_cache_capacity);
     RETURN_IF_ERROR(_txn_delete_bitmap_cache->init());
 
+    _committed_rs_mgr = std::make_unique<CloudCommittedRSMgr>();
+    RETURN_IF_ERROR(_committed_rs_mgr->init());
+
     _file_cache_block_downloader = 
std::make_unique<io::FileCacheBlockDownloader>(*this);
 
     _cloud_warm_up_manager = std::make_unique<CloudWarmUpManager>(*this);
diff --git a/be/src/cloud/cloud_storage_engine.h 
b/be/src/cloud/cloud_storage_engine.h
index 7d0a3e61296..68626ec0d9e 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -24,6 +24,7 @@
 //#include "cloud/cloud_cumulative_compaction.h"
 //#include "cloud/cloud_base_compaction.h"
 //#include "cloud/cloud_full_compaction.h"
+#include "cloud/cloud_committed_rs_mgr.h"
 #include "cloud/cloud_cumulative_compaction_policy.h"
 #include "cloud/cloud_tablet.h"
 #include "cloud/cloud_txn_delete_bitmap_cache.h"
@@ -94,6 +95,8 @@ public:
 
     CloudTxnDeleteBitmapCache& txn_delete_bitmap_cache() const { return 
*_txn_delete_bitmap_cache; }
 
+    CloudCommittedRSMgr& committed_rs_mgr() const { return *_committed_rs_mgr; 
}
+
     ThreadPool& calc_tablet_delete_bitmap_task_thread_pool() const {
         return *_calc_tablet_delete_bitmap_task_thread_pool;
     }
@@ -214,6 +217,7 @@ private:
     std::unique_ptr<cloud::CloudMetaMgr> _meta_mgr;
     std::unique_ptr<CloudTabletMgr> _tablet_mgr;
     std::unique_ptr<CloudTxnDeleteBitmapCache> _txn_delete_bitmap_cache;
+    std::unique_ptr<CloudCommittedRSMgr> _committed_rs_mgr;
     std::unique_ptr<ThreadPool> _calc_tablet_delete_bitmap_task_thread_pool;
     std::unique_ptr<ThreadPool> _sync_delete_bitmap_thread_pool;
 
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 9936ac374f6..faa9a82ae6a 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -27,6 +27,7 @@
 #include <rapidjson/rapidjson.h>
 #include <rapidjson/stringbuffer.h>
 
+#include <algorithm>
 #include <atomic>
 #include <chrono>
 #include <cstdint>
@@ -41,6 +42,7 @@
 #include "cloud/cloud_storage_engine.h"
 #include "cloud/cloud_tablet_mgr.h"
 #include "cloud/cloud_warm_up_manager.h"
+#include "cloud/config.h"
 #include "common/cast_set.h"
 #include "common/config.h"
 #include "common/logging.h"
@@ -1829,5 +1831,172 @@ void 
CloudTablet::_add_rowsets_directly(std::vector<RowsetSharedPtr>& rowsets,
     _tablet_meta->add_rowsets_unchecked(rowsets);
 }
 
+void CloudTablet::clear_unused_visible_pending_rowsets() {
+    int64_t cur_max_version = max_version().second;
+    int32_t max_version_count = max_version_config();
+    int64_t current_time = std::chrono::duration_cast<std::chrono::seconds>(
+                                   
std::chrono::system_clock::now().time_since_epoch())
+                                   .count();
+
+    std::unique_lock<std::mutex> wlock(_visible_pending_rs_lock);
+    for (auto it = _visible_pending_rs_map.begin(); it != 
_visible_pending_rs_map.end();) {
+        if (int64_t version = it->first, expiration_time = 
it->second.expiration_time;
+            version <= cur_max_version || expiration_time < current_time) {
+            it = _visible_pending_rs_map.erase(it);
+        } else {
+            ++it;
+        }
+    }
+
+    while (!_visible_pending_rs_map.empty() && _visible_pending_rs_map.size() 
> max_version_count) {
+        _visible_pending_rs_map.erase(--_visible_pending_rs_map.end());
+    }
+}
+
+void CloudTablet::try_make_committed_rs_visible(int64_t txn_id, int64_t 
visible_version,
+                                                int64_t 
version_update_time_ms) {
+    if (enable_unique_key_merge_on_write()) {
+        // for mow tablet, we get committed rowset from 
`CloudTxnDeleteBitmapCache` rather than `CommittedRowsetManager`
+        try_make_committed_rs_visible_for_mow(txn_id, visible_version, 
version_update_time_ms);
+        return;
+    }
+
+    auto& committed_rs_mgr = _engine.committed_rs_mgr();
+    auto res = committed_rs_mgr.get_committed_rowset(txn_id, tablet_id());
+    if (!res.has_value()) {
+        return;
+    }
+    auto [rowset_meta, expiration_time] = res.value();
+    bool is_empty_rowset = (rowset_meta == nullptr);
+    if (!is_empty_rowset) {
+        rowset_meta->set_cloud_fields_after_visible(visible_version, 
version_update_time_ms);
+    }
+    {
+        std::lock_guard<std::mutex> lock(_visible_pending_rs_lock);
+        _visible_pending_rs_map.emplace(
+                visible_version,
+                VisiblePendingRowset {rowset_meta, expiration_time, 
is_empty_rowset});
+    }
+    apply_visible_pending_rowsets();
+    committed_rs_mgr.remove_committed_rowset(txn_id, tablet_id());
+}
+
+void CloudTablet::try_make_committed_rs_visible_for_mow(int64_t txn_id, 
int64_t visible_version,
+                                                        int64_t 
version_update_time_ms) {
+    Defer defer {[&] {
+        
_engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id, 
tablet_id());
+    }};
+    auto res = 
_engine.txn_delete_bitmap_cache().get_rowset_and_delete_bitmap(txn_id, 
tablet_id());
+    if (!res.has_value()) {
+        return;
+    }
+    auto [rowset, delete_bitmap] = res.value();
+    bool is_empty_rowset = (rowset == nullptr);
+    {
+        std::unique_lock lock {_sync_meta_lock};
+        std::unique_lock meta_wlock {_meta_lock};
+        if (_max_version + 1 != visible_version) {
+            return;
+        }
+        if (is_empty_rowset) {
+            Versions existing_versions;
+            for (const auto& [_, rs] : tablet_meta()->all_rs_metas()) {
+                existing_versions.emplace_back(rs->version());
+            }
+            if (existing_versions.empty()) {
+                return;
+            }
+            auto max_version = std::ranges::max(existing_versions, {}, 
&Version::first);
+            auto prev_rowset = get_rowset_by_version(max_version);
+            auto st = _engine.meta_mgr().create_empty_rowset_for_hole(
+                    this, visible_version, prev_rowset->rowset_meta(), 
&rowset);
+            if (!st.ok()) {
+                return;
+            }
+        } else {
+            for (const auto& [delete_bitmap_key, bitmap_value] : 
delete_bitmap->delete_bitmap) {
+                // skip sentinel mark, which is used for delete bitmap 
correctness check
+                if (std::get<1>(delete_bitmap_key) != 
DeleteBitmap::INVALID_SEGMENT_ID) {
+                    tablet_meta()->delete_bitmap().merge(
+                            {std::get<0>(delete_bitmap_key), 
std::get<1>(delete_bitmap_key),
+                             visible_version},
+                            bitmap_value);
+                }
+            }
+        }
+        rowset->rowset_meta()->set_cloud_fields_after_visible(visible_version,
+                                                              
version_update_time_ms);
+        add_rowsets({rowset}, false, meta_wlock, true);
+    }
+    LOG(INFO) << "mow added visible pending rowset, txn_id=" << txn_id
+              << ", tablet_id=" << tablet_id() << ", version=" << 
visible_version
+              << ", rowset_id=" << rowset->rowset_id().to_string();
+}
+
+void CloudTablet::apply_visible_pending_rowsets() {
+    Defer defer {[&] { clear_unused_visible_pending_rowsets(); }};
+
+    std::unique_lock lock(_sync_meta_lock);
+    std::unique_lock<std::shared_mutex> meta_wlock(_meta_lock);
+    int64_t next_version = _max_version + 1;
+    std::vector<RowsetSharedPtr> to_add;
+    std::lock_guard<std::mutex> pending_lock(_visible_pending_rs_lock);
+    for (auto it = _visible_pending_rs_map.upper_bound(_max_version);
+         it != _visible_pending_rs_map.end(); ++it) {
+        int64_t version = it->first;
+        if (version != next_version) break;
+
+        auto& pending_rs = it->second;
+        if (pending_rs.is_empty_rowset) {
+            RowsetSharedPtr prev_rowset {nullptr};
+            if (!to_add.empty()) {
+                prev_rowset = to_add.back();
+            } else {
+                Versions existing_versions;
+                for (const auto& [_, rs] : tablet_meta()->all_rs_metas()) {
+                    existing_versions.emplace_back(rs->version());
+                }
+                if (existing_versions.empty()) {
+                    break;
+                }
+                auto max_version = std::ranges::max(existing_versions, {}, 
&Version::first);
+                prev_rowset = get_rowset_by_version(max_version);
+            }
+            RowsetSharedPtr rowset;
+            auto st = _engine.meta_mgr().create_empty_rowset_for_hole(
+                    this, version, prev_rowset->rowset_meta(), &rowset);
+            if (!st.ok()) {
+                return;
+            }
+            to_add.push_back(std::move(rowset));
+        } else {
+            RowsetSharedPtr rowset;
+            auto st = RowsetFactory::create_rowset(nullptr, "", 
pending_rs.rowset_meta, &rowset);
+            if (!st.ok()) {
+                LOG(WARNING) << "failed to create rowset from pending rowset 
meta, tablet_id="
+                             << tablet_id() << ", version=" << version
+                             << ", rowset_id=" << 
pending_rs.rowset_meta->rowset_id().to_string()
+                             << ", error=" << st;
+                break;
+            }
+            to_add.push_back(std::move(rowset));
+        }
+        next_version++;
+    }
+    if (!to_add.empty()) {
+        add_rowsets(to_add, false, meta_wlock, true);
+        LOG_INFO(
+                "applied_visible_pending_rowsets, tablet_id={}, 
new_max_version={}, "
+                "count={}, new_rowsets={}",
+                tablet_id(), _max_version, to_add.size(),
+                fmt::join(to_add | std::views::transform([](const 
RowsetSharedPtr& rs) {
+                              return fmt::format("{}{}", 
rs->rowset_id().to_string(),
+                                                 rs->version().to_string());
+                          }),
+                          ","));
+    }
+}
+
 #include "common/compile_check_end.h"
+
 } // namespace doris
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index e7a9b13e851..889d92808f2 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -368,6 +368,18 @@ public:
 
     void add_warmed_up_rowset(const RowsetId& rowset_id);
 
+    // Try to apply visible pending rowsets to tablet meta in version order
+    // This should be called after receiving FE notification or when new 
rowsets are added
+    // @return Status::OK() if successfully applied, error otherwise
+    void apply_visible_pending_rowsets();
+
+    void try_make_committed_rs_visible(int64_t txn_id, int64_t visible_version,
+                                       int64_t version_update_time_ms);
+    void try_make_committed_rs_visible_for_mow(int64_t txn_id, int64_t 
visible_version,
+                                               int64_t version_update_time_ms);
+
+    void clear_unused_visible_pending_rowsets();
+
     std::string rowset_warmup_digest() const {
         std::string res;
         auto add_log = [&](const RowsetSharedPtr& rs) {
@@ -507,6 +519,23 @@ private:
     mutable std::shared_mutex _cluster_info_mutex;
     std::string _last_active_cluster_id;
     int64_t _last_active_time_ms {0};
+
+    // Map: version -> <rowset_meta, expiration_time>
+    // Stores rowsets that have been notified by FE but not yet added to 
tablet meta
+    // due to out-of-order notification or version discontinuity
+    struct VisiblePendingRowset {
+        const bool is_empty_rowset;
+        const int64_t expiration_time; // seconds since epoch
+        RowsetMetaSharedPtr rowset_meta;
+
+        VisiblePendingRowset(RowsetMetaSharedPtr rowset_meta_, int64_t 
expiration_time_,
+                             bool is_empty_rowset_ = false)
+                : is_empty_rowset(is_empty_rowset_),
+                  expiration_time(expiration_time_),
+                  rowset_meta(std::move(rowset_meta_)) {}
+    };
+    mutable std::mutex _visible_pending_rs_lock;
+    std::map<int64_t, VisiblePendingRowset> _visible_pending_rs_map;
 };
 
 using CloudTabletSPtr = std::shared_ptr<CloudTablet>;
diff --git a/be/src/cloud/cloud_tablet_mgr.cpp 
b/be/src/cloud/cloud_tablet_mgr.cpp
index 8c5f14fd7dc..3e979864138 100644
--- a/be/src/cloud/cloud_tablet_mgr.cpp
+++ b/be/src/cloud/cloud_tablet_mgr.cpp
@@ -360,6 +360,10 @@ void CloudTabletMgr::vacuum_stale_rowsets(const 
CountDownLatch& stop_latch) {
                       << ", tablet_id=" << 
tablet_id_with_max_useless_rowset_version_count;
         }
     }
+    {
+        _tablet_map->traverse(
+                [](auto&& tablet) { 
tablet->clear_unused_visible_pending_rowsets(); });
+    }
 }
 
 std::vector<std::weak_ptr<CloudTablet>> CloudTabletMgr::get_weak_tablets() {
diff --git a/be/src/cloud/cloud_tablets_channel.cpp 
b/be/src/cloud/cloud_tablets_channel.cpp
index 173b8654696..ff01f2c5858 100644
--- a/be/src/cloud/cloud_tablets_channel.cpp
+++ b/be/src/cloud/cloud_tablets_channel.cpp
@@ -250,9 +250,9 @@ Status CloudTabletsChannel::close(LoadChannel* parent, 
const PTabletWriterAddBlo
         }
     }
 
-    // 6. set txn related delete bitmap if necessary
+    // 6. set txn related info if necessary
     for (auto it = writers_to_commit.begin(); it != writers_to_commit.end();) {
-        auto st = (*it)->set_txn_related_delete_bitmap();
+        auto st = (*it)->set_txn_related_info();
         if (!st.ok()) {
             _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
             _close_status = std::move(st);
diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp 
b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
index f5a210feebc..b6349067b87 100644
--- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
+++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
@@ -27,6 +27,7 @@
 #include "common/status.h"
 #include "cpp/sync_point.h"
 #include "storage/olap_common.h"
+#include "storage/rowset/rowset_fwd.h"
 #include "storage/tablet/tablet_meta.h"
 #include "storage/txn/txn_manager.h"
 
@@ -96,6 +97,46 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
     return st;
 }
 
+Result<std::pair<RowsetSharedPtr, DeleteBitmapPtr>>
+CloudTxnDeleteBitmapCache::get_rowset_and_delete_bitmap(TTransactionId 
transaction_id,
+                                                        int64_t tablet_id) {
+    RowsetSharedPtr rowset;
+    {
+        std::shared_lock<std::shared_mutex> rlock(_rwlock);
+        TxnKey txn_key(transaction_id, tablet_id);
+        if (_empty_rowset_markers.contains(txn_key)) {
+            return std::make_pair(nullptr, nullptr);
+        }
+        auto iter = _txn_map.find(txn_key);
+        if (iter == _txn_map.end()) {
+            return ResultError(Status::InternalError<false>(""));
+        }
+        if (!(iter->second.publish_status &&
+              *(iter->second.publish_status) == PublishStatus::SUCCEED)) {
+            return ResultError(Status::InternalError<false>(""));
+        }
+        rowset = iter->second.rowset;
+    }
+
+    std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
+    CacheKey key(key_str);
+    Cache::Handle* handle = lookup(key);
+
+    DBUG_EXECUTE_IF("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss", 
{
+        handle = nullptr;
+        LOG(INFO) << "CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss, 
make cache missed "
+                     "when get delete bitmap, txn_id:"
+                  << transaction_id << ", tablet_id: " << tablet_id;
+    });
+    DeleteBitmapCacheValue* val =
+            handle == nullptr ? nullptr : 
reinterpret_cast<DeleteBitmapCacheValue*>(value(handle));
+    if (!val) {
+        return ResultError(Status::InternalError<false>(""));
+    }
+    Defer defer {[this, handle] { release(handle); }};
+    return std::make_pair(rowset, val->delete_bitmap);
+}
+
 Status CloudTxnDeleteBitmapCache::get_delete_bitmap(
         TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr* 
delete_bitmap,
         RowsetIdUnorderedSet* rowset_ids, std::shared_ptr<PublishStatus>* 
publish_status) {
diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h 
b/be/src/cloud/cloud_txn_delete_bitmap_cache.h
index 4274cb1b439..7cf6c27ecd8 100644
--- a/be/src/cloud/cloud_txn_delete_bitmap_cache.h
+++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h
@@ -77,6 +77,10 @@ public:
                              DeleteBitmapPtr* delete_bitmap, 
RowsetIdUnorderedSet* rowset_ids,
                              std::shared_ptr<PublishStatus>* publish_status);
 
+    // the caller should guarantee that the txn `transaction_id` has been 
published successfully in MS
+    Result<std::pair<RowsetSharedPtr, DeleteBitmapPtr>> 
get_rowset_and_delete_bitmap(
+            TTransactionId transaction_id, int64_t tablet_id);
+
 private:
     void _clean_thread_callback();
 
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 9d692c4ef0e..97c64a9b7a5 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -251,6 +251,8 @@ DEFINE_Int32(num_query_ctx_map_partitions, "128");
 DEFINE_Int32(make_snapshot_worker_count, "5");
 // the count of thread to release snapshot
 DEFINE_Int32(release_snapshot_worker_count, "5");
+// the count of thread to make committed rowsets visible in cloud mode
+DEFINE_Int32(cloud_make_committed_rs_visible_worker_count, "16");
 // report random wait a little time to avoid FE receiving multiple be reports 
at the same time.
 // do not set it to false for production environment
 DEFINE_mBool(report_random_wait, "true");
@@ -1712,6 +1714,10 @@ DEFINE_mInt32(concurrency_stats_dump_interval_ms, "100");
 DEFINE_Validator(concurrency_stats_dump_interval_ms,
                  [](const int32_t config) -> bool { return config >= 10; });
 
+DEFINE_mBool(cloud_mow_sync_rowsets_when_load_txn_begin, "true");
+
+DEFINE_mBool(enable_cloud_make_rs_visible_on_be, "false");
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index b1f76c32fea..e3fcefcb6ac 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -308,6 +308,8 @@ DECLARE_Int32(download_worker_count);
 DECLARE_Int32(make_snapshot_worker_count);
 // the count of thread to release snapshot
 DECLARE_Int32(release_snapshot_worker_count);
+// the count of thread to make committed rowsets visible in cloud mode
+DECLARE_Int32(cloud_make_committed_rs_visible_worker_count);
 // report random wait a little time to avoid FE receiving multiple be reports 
at the same time.
 // do not set it to false for production environment
 DECLARE_mBool(report_random_wait);
@@ -1792,6 +1794,10 @@ DECLARE_mString(aws_credentials_provider_version);
 DECLARE_mBool(enable_concurrency_stats_dump);
 DECLARE_mInt32(concurrency_stats_dump_interval_ms);
 
+DECLARE_mBool(cloud_mow_sync_rowsets_when_load_txn_begin);
+
+DECLARE_mBool(enable_cloud_make_rs_visible_on_be);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/storage/rowset/rowset_meta.h 
b/be/src/storage/rowset/rowset_meta.h
index 90b21ed1aa6..83a908613c5 100644
--- a/be/src/storage/rowset/rowset_meta.h
+++ b/be/src/storage/rowset/rowset_meta.h
@@ -395,11 +395,9 @@ public:
         }
         return system_clock::from_time_t(newest_write_timestamp());
     }
-#ifdef BE_TEST
     void set_visible_ts_ms(int64_t visible_ts_ms) {
         _rowset_meta_pb.set_visible_ts_ms(visible_ts_ms);
     }
-#endif
 
     void set_tablet_schema(const TabletSchemaSPtr& tablet_schema);
     void set_tablet_schema(const TabletSchemaPB& tablet_schema);
@@ -462,6 +460,16 @@ public:
                 [algorithm]() -> Result<EncryptionAlgorithmPB> { return 
algorithm; });
     }
 
+    void set_cloud_fields_after_visible(int64_t visible_version, int64_t 
version_update_time_ms) {
+        // Update rowset meta with correct version and visible_ts
+        // !!ATTENTION!!: this code should be updated if there are more fields
+        // in rowset meta which will be modified in meta-service when 
commit_txn in the future
+        set_version({visible_version, visible_version});
+        if (version_update_time_ms > 0) {
+            set_visible_ts_ms(version_update_time_ms);
+        }
+    }
+
 private:
     bool _deserialize_from_pb(std::string_view value);
 
diff --git a/be/test/cloud/cloud_committed_rs_mgr_test.cpp 
b/be/test/cloud/cloud_committed_rs_mgr_test.cpp
new file mode 100644
index 00000000000..9df6597b8a6
--- /dev/null
+++ b/be/test/cloud/cloud_committed_rs_mgr_test.cpp
@@ -0,0 +1,408 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_committed_rs_mgr.h"
+
+#include <gtest/gtest.h>
+
+#include <chrono>
+#include <memory>
+#include <thread>
+
+#include "cloud/config.h"
+#include "storage/rowset/rowset_meta.h"
+
+namespace doris {
+
+class CloudCommittedRSMgrTest : public testing::Test {
+protected:
+    void SetUp() override {
+        _mgr = std::make_unique<CloudCommittedRSMgr>();
+        // Do not call init() to avoid starting background cleanup thread
+    }
+
+    void TearDown() override { _mgr.reset(); }
+
+    RowsetMetaSharedPtr create_rowset_meta(int64_t tablet_id, int64_t txn_id,
+                                           int64_t rowset_id_val = 0) {
+        RowsetMetaPB rowset_meta_pb;
+        rowset_meta_pb.set_tablet_id(tablet_id);
+        rowset_meta_pb.set_txn_id(txn_id);
+        rowset_meta_pb.set_num_segments(1);
+        rowset_meta_pb.set_num_rows(100);
+        rowset_meta_pb.set_total_disk_size(1024);
+        rowset_meta_pb.set_data_disk_size(512);
+
+        RowsetId rowset_id;
+        if (rowset_id_val == 0) {
+            rowset_id.init(txn_id);
+        } else {
+            rowset_id.init(rowset_id_val);
+        }
+        rowset_meta_pb.set_rowset_id(0);
+        rowset_meta_pb.set_rowset_id_v2(rowset_id.to_string());
+
+        auto rowset_meta = std::make_shared<RowsetMeta>();
+        rowset_meta->init_from_pb(rowset_meta_pb);
+        return rowset_meta;
+    }
+
+    int64_t current_time_seconds() {
+        return std::chrono::duration_cast<std::chrono::seconds>(
+                       std::chrono::system_clock::now().time_since_epoch())
+                .count();
+    }
+
+protected:
+    std::unique_ptr<CloudCommittedRSMgr> _mgr;
+};
+
+TEST_F(CloudCommittedRSMgrTest, TestAddAndGetCommittedRowset) {
+    int64_t txn_id = 1000;
+    int64_t tablet_id = 2000;
+    auto rowset_meta = create_rowset_meta(tablet_id, txn_id);
+    int64_t expiration_time = current_time_seconds() + 3600;
+
+    // Add committed rowset
+    _mgr->add_committed_rowset(txn_id, tablet_id, rowset_meta, 
expiration_time);
+
+    // Get committed rowset
+    auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+    ASSERT_TRUE(result.has_value());
+    auto [retrieved_meta, retrieved_expiration] = result.value();
+    ASSERT_NE(retrieved_meta, nullptr);
+    EXPECT_EQ(retrieved_meta->tablet_id(), tablet_id);
+    EXPECT_EQ(retrieved_meta->txn_id(), txn_id);
+    EXPECT_EQ(retrieved_meta->rowset_id().to_string(), 
rowset_meta->rowset_id().to_string());
+    EXPECT_EQ(retrieved_expiration, expiration_time);
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestGetNonExistentRowset) {
+    int64_t txn_id = 1000;
+    int64_t tablet_id = 2000;
+
+    auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+    EXPECT_FALSE(result.has_value());
+    EXPECT_TRUE(result.error().is<ErrorCode::NOT_FOUND>());
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestRemoveCommittedRowset) {
+    int64_t txn_id = 1000;
+    int64_t tablet_id = 2000;
+    auto rowset_meta = create_rowset_meta(tablet_id, txn_id);
+    int64_t expiration_time = current_time_seconds() + 3600;
+
+    // Add committed rowset
+    _mgr->add_committed_rowset(txn_id, tablet_id, rowset_meta, 
expiration_time);
+
+    // Verify it exists
+    auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+    ASSERT_TRUE(result.has_value());
+
+    // Remove it
+    _mgr->remove_committed_rowset(txn_id, tablet_id);
+
+    // Verify it's gone
+    result = _mgr->get_committed_rowset(txn_id, tablet_id);
+    EXPECT_FALSE(result.has_value());
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestRemoveExpiredCommittedRowsets) {
+    // Save original config value
+    int32_t original_min_expired_seconds = 
config::tablet_txn_info_min_expired_seconds;
+    // Set min expiration to 0 to allow testing of past expiration times
+    config::tablet_txn_info_min_expired_seconds = -100;
+
+    int64_t current_time = current_time_seconds();
+
+    // Add expired rowset
+    int64_t txn_id_1 = 1000;
+    int64_t tablet_id_1 = 2000;
+    auto rowset_meta_1 = create_rowset_meta(tablet_id_1, txn_id_1);
+    int64_t expiration_time_1 = current_time - 10; // Already expired
+    _mgr->add_committed_rowset(txn_id_1, tablet_id_1, rowset_meta_1, 
expiration_time_1);
+
+    // Add non-expired rowset
+    int64_t txn_id_2 = 1001;
+    int64_t tablet_id_2 = 2001;
+    auto rowset_meta_2 = create_rowset_meta(tablet_id_2, txn_id_2);
+    int64_t expiration_time_2 = current_time + 3600; // Not expired
+    _mgr->add_committed_rowset(txn_id_2, tablet_id_2, rowset_meta_2, 
expiration_time_2);
+
+    // Verify both exist
+    EXPECT_TRUE(_mgr->get_committed_rowset(txn_id_1, tablet_id_1).has_value());
+    EXPECT_TRUE(_mgr->get_committed_rowset(txn_id_2, tablet_id_2).has_value());
+
+    // Remove expired rowsets
+    _mgr->remove_expired_committed_rowsets();
+
+    // Verify expired rowset is removed
+    EXPECT_FALSE(_mgr->get_committed_rowset(txn_id_1, 
tablet_id_1).has_value());
+    // Verify non-expired rowset still exists
+    EXPECT_TRUE(_mgr->get_committed_rowset(txn_id_2, tablet_id_2).has_value());
+
+    // Restore config
+    config::tablet_txn_info_min_expired_seconds = original_min_expired_seconds;
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestMarkAndCheckEmptyRowset) {
+    int64_t txn_id = 1000;
+    int64_t tablet_id = 2000;
+    int64_t txn_expiration = current_time_seconds() + 3600;
+
+    // Initially not marked as empty
+    auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+    EXPECT_FALSE(result.has_value());
+
+    // Mark as empty
+    _mgr->mark_empty_rowset(txn_id, tablet_id, txn_expiration);
+
+    // Check it's marked as empty
+    result = _mgr->get_committed_rowset(txn_id, tablet_id);
+    ASSERT_TRUE(result.has_value());
+    auto [retrieved_meta, retrieved_expiration] = result.value();
+    EXPECT_EQ(retrieved_meta, nullptr);
+    EXPECT_EQ(retrieved_expiration, txn_expiration);
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestEmptyRowsetExpiration) {
+    // Save original config value
+    int32_t original_min_expired_seconds = 
config::tablet_txn_info_min_expired_seconds;
+    // Set min expiration to 0 to allow testing of past expiration times
+    config::tablet_txn_info_min_expired_seconds = -100;
+
+    int64_t current_time = current_time_seconds();
+
+    // Mark as empty with past expiration
+    int64_t txn_id = 1000;
+    int64_t tablet_id = 2000;
+    int64_t txn_expiration = current_time - 10; // Already expired
+    _mgr->mark_empty_rowset(txn_id, tablet_id, txn_expiration);
+
+    // Verify it's marked
+    auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+    ASSERT_TRUE(result.has_value());
+    EXPECT_EQ(result.value().first, nullptr);
+
+    // Remove expired rowsets
+    _mgr->remove_expired_committed_rowsets();
+
+    // Verify it's removed
+    result = _mgr->get_committed_rowset(txn_id, tablet_id);
+    EXPECT_FALSE(result.has_value());
+
+    // Restore config
+    config::tablet_txn_info_min_expired_seconds = original_min_expired_seconds;
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestMultipleRowsets) {
+    int64_t expiration_time = current_time_seconds() + 3600;
+
+    // Add multiple rowsets for different tablets and transactions
+    for (int i = 0; i < 10; i++) {
+        int64_t txn_id = 1000 + i;
+        int64_t tablet_id = 2000 + i;
+        auto rowset_meta = create_rowset_meta(tablet_id, txn_id);
+        _mgr->add_committed_rowset(txn_id, tablet_id, rowset_meta, 
expiration_time);
+    }
+
+    // Verify all rowsets can be retrieved
+    for (int i = 0; i < 10; i++) {
+        int64_t txn_id = 1000 + i;
+        int64_t tablet_id = 2000 + i;
+        auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+        ASSERT_TRUE(result.has_value());
+        auto [retrieved_meta, retrieved_expiration] = result.value();
+        EXPECT_EQ(retrieved_meta->tablet_id(), tablet_id);
+        EXPECT_EQ(retrieved_meta->txn_id(), txn_id);
+    }
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestSameTransactionDifferentTablets) {
+    int64_t txn_id = 1000;
+    int64_t expiration_time = current_time_seconds() + 3600;
+
+    // Add same txn_id for different tablets
+    for (int i = 0; i < 5; i++) {
+        int64_t tablet_id = 2000 + i;
+        auto rowset_meta = create_rowset_meta(tablet_id, txn_id);
+        _mgr->add_committed_rowset(txn_id, tablet_id, rowset_meta, 
expiration_time);
+    }
+
+    // Verify all can be retrieved independently
+    for (int i = 0; i < 5; i++) {
+        int64_t tablet_id = 2000 + i;
+        auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+        ASSERT_TRUE(result.has_value());
+        auto [retrieved_meta, retrieved_expiration] = result.value();
+        EXPECT_EQ(retrieved_meta->tablet_id(), tablet_id);
+        EXPECT_EQ(retrieved_meta->txn_id(), txn_id);
+    }
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestMinExpirationTime) {
+    // Save original config value
+    int64_t original_min_expired_seconds = 
config::tablet_txn_info_min_expired_seconds;
+
+    // Set min expiration to 100 seconds
+    config::tablet_txn_info_min_expired_seconds = 100;
+
+    int64_t txn_id = 1000;
+    int64_t tablet_id = 2000;
+    auto rowset_meta = create_rowset_meta(tablet_id, txn_id);
+
+    // Try to set expiration time less than min
+    int64_t current_time = current_time_seconds();
+    int64_t short_expiration = current_time + 10; // Less than min
+
+    _mgr->add_committed_rowset(txn_id, tablet_id, rowset_meta, 
short_expiration);
+
+    // Get and verify expiration is at least min
+    auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+    ASSERT_TRUE(result.has_value());
+    auto [retrieved_meta, retrieved_expiration] = result.value();
+    EXPECT_GE(retrieved_expiration, current_time + 
config::tablet_txn_info_min_expired_seconds);
+
+    // Restore config
+    config::tablet_txn_info_min_expired_seconds = original_min_expired_seconds;
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestMixedRowsetsAndEmptyMarkers) {
+    int64_t expiration_time = current_time_seconds() + 3600;
+
+    // Add some normal rowsets
+    for (int i = 0; i < 5; i++) {
+        int64_t txn_id = 1000 + i;
+        int64_t tablet_id = 2000 + i;
+        auto rowset_meta = create_rowset_meta(tablet_id, txn_id);
+        _mgr->add_committed_rowset(txn_id, tablet_id, rowset_meta, 
expiration_time);
+    }
+
+    // Add some empty markers
+    for (int i = 5; i < 10; i++) {
+        int64_t txn_id = 1000 + i;
+        int64_t tablet_id = 2000 + i;
+        _mgr->mark_empty_rowset(txn_id, tablet_id, expiration_time);
+    }
+
+    // Verify normal rowsets
+    for (int i = 0; i < 5; i++) {
+        int64_t txn_id = 1000 + i;
+        int64_t tablet_id = 2000 + i;
+        auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+        ASSERT_TRUE(result.has_value());
+        auto [retrieved_meta, retrieved_expiration] = result.value();
+        EXPECT_NE(retrieved_meta, nullptr);
+    }
+
+    // Verify empty markers
+    for (int i = 5; i < 10; i++) {
+        int64_t txn_id = 1000 + i;
+        int64_t tablet_id = 2000 + i;
+        auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+        ASSERT_TRUE(result.has_value());
+        auto [retrieved_meta, retrieved_expiration] = result.value();
+        EXPECT_EQ(retrieved_meta, nullptr);
+    }
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestExpiredRowsetsCleanupWithMixedTypes) {
+    // Save original config value
+    int64_t original_min_expired_seconds = 
config::tablet_txn_info_min_expired_seconds;
+    // Set min expiration to 0 to allow testing of past expiration times
+    config::tablet_txn_info_min_expired_seconds = 0;
+
+    int64_t current_time = current_time_seconds();
+
+    // Add expired normal rowset
+    int64_t txn_id_1 = 1000;
+    int64_t tablet_id_1 = 2000;
+    auto rowset_meta_1 = create_rowset_meta(tablet_id_1, txn_id_1);
+    _mgr->add_committed_rowset(txn_id_1, tablet_id_1, rowset_meta_1, 
current_time - 10);
+
+    // Add expired empty marker
+    int64_t txn_id_2 = 1001;
+    int64_t tablet_id_2 = 2001;
+    _mgr->mark_empty_rowset(txn_id_2, tablet_id_2, current_time - 10);
+
+    // Add non-expired normal rowset
+    int64_t txn_id_3 = 1002;
+    int64_t tablet_id_3 = 2002;
+    auto rowset_meta_3 = create_rowset_meta(tablet_id_3, txn_id_3);
+    _mgr->add_committed_rowset(txn_id_3, tablet_id_3, rowset_meta_3, 
current_time + 3600);
+
+    // Add non-expired empty marker
+    int64_t txn_id_4 = 1003;
+    int64_t tablet_id_4 = 2003;
+    _mgr->mark_empty_rowset(txn_id_4, tablet_id_4, current_time + 3600);
+
+    // Verify all exist
+    EXPECT_TRUE(_mgr->get_committed_rowset(txn_id_1, tablet_id_1).has_value());
+    auto result_2 = _mgr->get_committed_rowset(txn_id_2, tablet_id_2);
+    EXPECT_TRUE(result_2.has_value());
+    EXPECT_EQ(result_2.value().first, nullptr);
+    EXPECT_TRUE(_mgr->get_committed_rowset(txn_id_3, tablet_id_3).has_value());
+    auto result_4 = _mgr->get_committed_rowset(txn_id_4, tablet_id_4);
+    EXPECT_TRUE(result_4.has_value());
+    EXPECT_EQ(result_4.value().first, nullptr);
+
+    // Remove expired
+    _mgr->remove_expired_committed_rowsets();
+
+    // Verify expired are removed
+    EXPECT_FALSE(_mgr->get_committed_rowset(txn_id_1, 
tablet_id_1).has_value());
+    EXPECT_FALSE(_mgr->get_committed_rowset(txn_id_2, 
tablet_id_2).has_value());
+
+    // Verify non-expired still exist
+    EXPECT_TRUE(_mgr->get_committed_rowset(txn_id_3, tablet_id_3).has_value());
+    result_4 = _mgr->get_committed_rowset(txn_id_4, tablet_id_4);
+    EXPECT_TRUE(result_4.has_value());
+    EXPECT_EQ(result_4.value().first, nullptr);
+
+    // Restore config
+    config::tablet_txn_info_min_expired_seconds = original_min_expired_seconds;
+}
+
+TEST_F(CloudCommittedRSMgrTest, TestUpdateSameRowset) {
+    int64_t txn_id = 1000;
+    int64_t tablet_id = 2000;
+    int64_t expiration_time_1 = current_time_seconds() + 1800;
+    int64_t expiration_time_2 = current_time_seconds() + 3600;
+
+    // Add rowset first time
+    auto rowset_meta_1 = create_rowset_meta(tablet_id, txn_id, 10001);
+    _mgr->add_committed_rowset(txn_id, tablet_id, rowset_meta_1, 
expiration_time_1);
+
+    // Verify first rowset is added
+    auto result = _mgr->get_committed_rowset(txn_id, tablet_id);
+    ASSERT_TRUE(result.has_value());
+    auto [retrieved_meta_1, retrieved_expiration_1] = result.value();
+    EXPECT_EQ(retrieved_meta_1->rowset_id().to_string(), 
rowset_meta_1->rowset_id().to_string());
+
+    // Add same txn_id and tablet_id again with different rowset and expiration
+    // Due to using insert_or_assign(), the second insert should overwrite the 
first one
+    auto rowset_meta_2 = create_rowset_meta(tablet_id, txn_id, 10002);
+    _mgr->add_committed_rowset(txn_id, tablet_id, rowset_meta_2, 
expiration_time_2);
+
+    // Get and verify it's the second one (insert_or_assign overwrites)
+    result = _mgr->get_committed_rowset(txn_id, tablet_id);
+    ASSERT_TRUE(result.has_value());
+    auto [retrieved_meta_2, retrieved_expiration_2] = result.value();
+    EXPECT_EQ(retrieved_meta_2->rowset_id().to_string(), 
rowset_meta_2->rowset_id().to_string());
+}
+
+} // namespace doris
diff --git a/be/test/cloud/cloud_tablet_test.cpp 
b/be/test/cloud/cloud_tablet_test.cpp
index 7f989037e2b..2fee56fbac8 100644
--- a/be/test/cloud/cloud_tablet_test.cpp
+++ b/be/test/cloud/cloud_tablet_test.cpp
@@ -940,4 +940,369 @@ TEST_F(CloudTabletSyncMetaTest, 
TestSyncMetaMultipleProperties) {
     sp->disable_processing();
     sp->clear_all_call_backs();
 }
+class CloudTabletApplyVisiblePendingTest : public testing::Test {
+public:
+    CloudTabletApplyVisiblePendingTest() : 
_engine(CloudStorageEngine(EngineOptions {})) {}
+
+    void SetUp() override {
+        _tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, 
TTabletSchema(), 6, {{7, 8}},
+                                          UniqueId(9, 10), 
TTabletType::TABLET_TYPE_DISK,
+                                          TCompressionType::LZ4F));
+        _tablet =
+                std::make_shared<CloudTablet>(_engine, 
std::make_shared<TabletMeta>(*_tablet_meta));
+    }
+
+    void TearDown() override {}
+
+    RowsetSharedPtr create_rowset(Version version, int num_segments = 1) {
+        auto rs_meta = std::make_shared<RowsetMeta>();
+        rs_meta->set_rowset_type(BETA_ROWSET);
+        rs_meta->set_version(version);
+        rs_meta->set_rowset_id(_engine.next_rowset_id());
+        rs_meta->set_num_segments(num_segments);
+        RowsetSharedPtr rowset;
+        Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, 
&rowset);
+        if (!st.ok()) {
+            return nullptr;
+        }
+        return rowset;
+    }
+
+    RowsetMetaSharedPtr create_pending_rowset_meta(int64_t version) {
+        auto rs_meta = std::make_shared<RowsetMeta>();
+        rs_meta->set_rowset_type(BETA_ROWSET);
+        rs_meta->set_version(Version(version, version));
+        rs_meta->set_rowset_id(_engine.next_rowset_id());
+        rs_meta->set_num_segments(1);
+        return rs_meta;
+    }
+
+    // Create a rowset whose RowsetMeta carries a valid TabletSchema,
+    // required as template for create_empty_rowset_for_hole.
+    RowsetSharedPtr create_rowset_with_schema(Version version, int 
num_segments = 1) {
+        auto rs_meta = std::make_shared<RowsetMeta>();
+        rs_meta->set_rowset_type(BETA_ROWSET);
+        rs_meta->set_version(version);
+        rs_meta->set_rowset_id(_engine.next_rowset_id());
+        rs_meta->set_num_segments(num_segments);
+
+        TabletSchemaPB schema_pb;
+        schema_pb.set_keys_type(KeysType::DUP_KEYS);
+        auto* col = schema_pb.add_column();
+        col->set_unique_id(0);
+        col->set_name("k1");
+        col->set_type("INT");
+        col->set_is_key(true);
+        col->set_is_nullable(false);
+        rs_meta->set_tablet_schema(schema_pb);
+
+        RowsetSharedPtr rowset;
+        Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, 
&rowset);
+        if (!st.ok()) {
+            return nullptr;
+        }
+        return rowset;
+    }
+
+    void add_initial_rowsets(const std::vector<RowsetSharedPtr>& rowsets) {
+        std::unique_lock<std::shared_mutex> 
meta_wlock(_tablet->get_header_lock());
+        _tablet->add_rowsets(std::vector<RowsetSharedPtr>(rowsets), false, 
meta_wlock, false);
+    }
+
+    void add_pending_rowset(int64_t version, RowsetMetaSharedPtr rowset_meta,
+                            int64_t expiration_time = INT64_MAX, bool is_empty 
= false) {
+        std::lock_guard<std::mutex> lock(_tablet->_visible_pending_rs_lock);
+        _tablet->_visible_pending_rs_map.emplace(
+                version, CloudTablet::VisiblePendingRowset 
{std::move(rowset_meta), expiration_time,
+                                                            is_empty});
+    }
+
+    size_t pending_rs_count() const {
+        std::lock_guard<std::mutex> lock(_tablet->_visible_pending_rs_lock);
+        return _tablet->_visible_pending_rs_map.size();
+    }
+
+protected:
+    TabletMetaSharedPtr _tablet_meta;
+    std::shared_ptr<CloudTablet> _tablet;
+    CloudStorageEngine _engine;
+};
+
+// Test apply with no pending rowsets does nothing
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplyNoPendingRowsets) {
+    auto rs = create_rowset(Version(0, 1));
+    ASSERT_NE(rs, nullptr);
+    add_initial_rowsets({rs});
+    EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+    _tablet->apply_visible_pending_rowsets();
+
+    EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+    auto& rowset_map = _tablet->rowset_map();
+    EXPECT_EQ(rowset_map.size(), 1);
+    EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+}
+
+// Test apply single consecutive non-empty rowset
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplySingleConsecutiveRowset) {
+    auto rs = create_rowset(Version(0, 1));
+    ASSERT_NE(rs, nullptr);
+    add_initial_rowsets({rs});
+    EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+    add_pending_rowset(2, create_pending_rowset_meta(2));
+
+    _tablet->apply_visible_pending_rowsets();
+
+    EXPECT_EQ(_tablet->max_version_unlocked(), 2);
+    auto& rowset_map = _tablet->rowset_map();
+    EXPECT_EQ(rowset_map.size(), 2);
+    EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+    EXPECT_TRUE(rowset_map.contains(Version(2, 2)));
+}
+
+// Test apply multiple consecutive non-empty rowsets
+TEST_F(CloudTabletApplyVisiblePendingTest, 
TestApplyMultipleConsecutiveRowsets) {
+    auto rs = create_rowset(Version(0, 1));
+    ASSERT_NE(rs, nullptr);
+    add_initial_rowsets({rs});
+    EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+    for (int64_t v = 2; v <= 4; ++v) {
+        add_pending_rowset(v, create_pending_rowset_meta(v));
+    }
+
+    _tablet->apply_visible_pending_rowsets();
+
+    EXPECT_EQ(_tablet->max_version_unlocked(), 4);
+    auto& rowset_map = _tablet->rowset_map();
+    EXPECT_EQ(rowset_map.size(), 4);
+    EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+    EXPECT_TRUE(rowset_map.contains(Version(2, 2)));
+    EXPECT_TRUE(rowset_map.contains(Version(3, 3)));
+    EXPECT_TRUE(rowset_map.contains(Version(4, 4)));
+}
+
+// Test apply with version gap - nothing should be applied
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplyWithVersionGap) {
+    auto rs = create_rowset(Version(0, 1));
+    ASSERT_NE(rs, nullptr);
+    add_initial_rowsets({rs});
+    EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+    // Add version 3 only, skip version 2
+    add_pending_rowset(3, create_pending_rowset_meta(3));
+
+    _tablet->apply_visible_pending_rowsets();
+
+    EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+    auto& rowset_map = _tablet->rowset_map();
+    EXPECT_EQ(rowset_map.size(), 1);
+    EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+    EXPECT_FALSE(rowset_map.contains(Version(3, 3)));
+}
+
+// Test apply with partial consecutive versions - only consecutive prefix 
applied
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplyPartialConsecutive) {
+    auto rs = create_rowset(Version(0, 1));
+    ASSERT_NE(rs, nullptr);
+    add_initial_rowsets({rs});
+    EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+    // Add versions 2, 3, 5 (version 4 missing)
+    add_pending_rowset(2, create_pending_rowset_meta(2));
+    add_pending_rowset(3, create_pending_rowset_meta(3));
+    add_pending_rowset(5, create_pending_rowset_meta(5));
+
+    _tablet->apply_visible_pending_rowsets();
+
+    // Only versions 2 and 3 should be applied
+    EXPECT_EQ(_tablet->max_version_unlocked(), 3);
+    auto& rowset_map = _tablet->rowset_map();
+    EXPECT_EQ(rowset_map.size(), 3);
+    EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+    EXPECT_TRUE(rowset_map.contains(Version(2, 2)));
+    EXPECT_TRUE(rowset_map.contains(Version(3, 3)));
+    EXPECT_FALSE(rowset_map.contains(Version(5, 5)));
+}
+
+// Test apply with pending versions below max_version - nothing applied
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplyPendingBelowMaxVersion) {
+    auto rs1 = create_rowset(Version(0, 1));
+    auto rs2 = create_rowset(Version(2, 5));
+    ASSERT_NE(rs1, nullptr);
+    ASSERT_NE(rs2, nullptr);
+    add_initial_rowsets({rs1, rs2});
+    EXPECT_EQ(_tablet->max_version_unlocked(), 5);
+
+    // Add pending versions 3 and 4, both below max_version
+    add_pending_rowset(3, create_pending_rowset_meta(3));
+    add_pending_rowset(4, create_pending_rowset_meta(4));
+
+    _tablet->apply_visible_pending_rowsets();
+
+    EXPECT_EQ(_tablet->max_version_unlocked(), 5);
+    auto& rowset_map = _tablet->rowset_map();
+    EXPECT_EQ(rowset_map.size(), 2);
+    EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+    EXPECT_TRUE(rowset_map.contains(Version(2, 5)));
+    EXPECT_FALSE(rowset_map.contains(Version(3, 3)));
+    EXPECT_FALSE(rowset_map.contains(Version(4, 4)));
+}
+
+// Test apply with initial max_version = -1 (no initial rowsets)
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplyWithNoInitialRowsets) {
+    EXPECT_EQ(_tablet->max_version_unlocked(), -1);
+
+    add_pending_rowset(0, create_pending_rowset_meta(0));
+
+    _tablet->apply_visible_pending_rowsets();
+
+    EXPECT_EQ(_tablet->max_version_unlocked(), 0);
+    auto& rowset_map = _tablet->rowset_map();
+    EXPECT_EQ(rowset_map.size(), 1);
+    EXPECT_TRUE(rowset_map.contains(Version(0, 0)));
+}
+
+// Test apply called multiple times incrementally
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplyMultipleCalls) {
+    auto rs = create_rowset(Version(0, 1));
+    ASSERT_NE(rs, nullptr);
+    add_initial_rowsets({rs});
+    EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+    // First apply: version 2
+    add_pending_rowset(2, create_pending_rowset_meta(2));
+    _tablet->apply_visible_pending_rowsets();
+    EXPECT_EQ(_tablet->max_version_unlocked(), 2);
+    EXPECT_TRUE(_tablet->rowset_map().contains(Version(2, 2)));
+
+    // Second apply: version 3
+    add_pending_rowset(3, create_pending_rowset_meta(3));
+    _tablet->apply_visible_pending_rowsets();
+    EXPECT_EQ(_tablet->max_version_unlocked(), 3);
+    auto& rowset_map = _tablet->rowset_map();
+    EXPECT_EQ(rowset_map.size(), 3);
+    EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+    EXPECT_TRUE(rowset_map.contains(Version(2, 2)));
+    EXPECT_TRUE(rowset_map.contains(Version(3, 3)));
+}
+
+// Test gap resolved by later apply call
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplyGapResolvedLater) {
+    auto rs = create_rowset(Version(0, 1));
+    ASSERT_NE(rs, nullptr);
+    add_initial_rowsets({rs});
+    EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+    // Add version 3 first (gap at version 2)
+    add_pending_rowset(3, create_pending_rowset_meta(3));
+    _tablet->apply_visible_pending_rowsets();
+    EXPECT_EQ(_tablet->max_version_unlocked(), 1); // Nothing applied
+    EXPECT_FALSE(_tablet->rowset_map().contains(Version(3, 3)));
+
+    // Now add version 2 to fill the gap
+    add_pending_rowset(2, create_pending_rowset_meta(2));
+    _tablet->apply_visible_pending_rowsets();
+
+    // Both versions 2 and 3 should now be applied
+    EXPECT_EQ(_tablet->max_version_unlocked(), 3);
+    auto& rowset_map = _tablet->rowset_map();
+    EXPECT_EQ(rowset_map.size(), 3);
+    EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+    EXPECT_TRUE(rowset_map.contains(Version(2, 2)));
+    EXPECT_TRUE(rowset_map.contains(Version(3, 3)));
+}
+
+// Test clear_unused_visible_pending_rowsets removes applied entries
+TEST_F(CloudTabletApplyVisiblePendingTest, TestClearAfterApply) {
+    auto rs = create_rowset(Version(0, 1));
+    ASSERT_NE(rs, nullptr);
+    add_initial_rowsets({rs});
+
+    add_pending_rowset(2, create_pending_rowset_meta(2));
+    add_pending_rowset(3, create_pending_rowset_meta(3));
+    // Version 5 has a gap, won't be applied
+    add_pending_rowset(5, create_pending_rowset_meta(5));
+    EXPECT_EQ(pending_rs_count(), 3);
+
+    _tablet->apply_visible_pending_rowsets();
+
+    EXPECT_EQ(_tablet->max_version_unlocked(), 3);
+    auto& rowset_map = _tablet->rowset_map();
+    EXPECT_EQ(rowset_map.size(), 3);
+    EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+    EXPECT_TRUE(rowset_map.contains(Version(2, 2)));
+    EXPECT_TRUE(rowset_map.contains(Version(3, 3)));
+    EXPECT_FALSE(rowset_map.contains(Version(5, 5)));
+    // Versions 2 and 3 are cleared (applied, version <= max_version)
+    // Version 5 remains (not applied, not expired)
+    EXPECT_EQ(pending_rs_count(), 1);
+}
+
+// Test empty rowset with no existing versions breaks early
+TEST_F(CloudTabletApplyVisiblePendingTest, 
TestApplyEmptyRowsetNoExistingVersions) {
+    EXPECT_EQ(_tablet->max_version_unlocked(), -1);
+
+    // Add empty pending rowset at version 0
+    add_pending_rowset(0, nullptr, INT64_MAX, true);
+
+    _tablet->apply_visible_pending_rowsets();
+
+    // Cannot create empty rowset without a previous rowset as template
+    EXPECT_EQ(_tablet->max_version_unlocked(), -1);
+    EXPECT_EQ(_tablet->rowset_map().size(), 0);
+}
+
+// Test empty rowset with existing version uses create_empty_rowset_for_hole
+TEST_F(CloudTabletApplyVisiblePendingTest, 
TestApplyEmptyRowsetWithExistingVersion) {
+    auto rs = create_rowset_with_schema(Version(0, 1));
+    ASSERT_NE(rs, nullptr);
+    add_initial_rowsets({rs});
+    EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+    // Add empty pending rowset at version 2
+    add_pending_rowset(2, nullptr, INT64_MAX, true);
+
+    _tablet->apply_visible_pending_rowsets();
+
+    EXPECT_EQ(_tablet->max_version_unlocked(), 2);
+    auto& rowset_map = _tablet->rowset_map();
+    EXPECT_EQ(rowset_map.size(), 2);
+    EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+    EXPECT_TRUE(rowset_map.contains(Version(2, 2)));
+}
+
+// Test mixed non-empty followed by empty rowset
+TEST_F(CloudTabletApplyVisiblePendingTest, TestApplyNonEmptyThenEmptyRowset) {
+    auto rs = create_rowset_with_schema(Version(0, 1));
+    ASSERT_NE(rs, nullptr);
+    add_initial_rowsets({rs});
+    EXPECT_EQ(_tablet->max_version_unlocked(), 1);
+
+    // Version 2: non-empty (with schema for empty rowset template), Version 
3: empty
+    auto pending_meta = create_pending_rowset_meta(2);
+    TabletSchemaPB schema_pb;
+    schema_pb.set_keys_type(KeysType::DUP_KEYS);
+    auto* col = schema_pb.add_column();
+    col->set_unique_id(0);
+    col->set_name("k1");
+    col->set_type("INT");
+    col->set_is_key(true);
+    col->set_is_nullable(false);
+    pending_meta->set_tablet_schema(schema_pb);
+    add_pending_rowset(2, std::move(pending_meta));
+    add_pending_rowset(3, nullptr, INT64_MAX, true);
+
+    _tablet->apply_visible_pending_rowsets();
+
+    // Both should be applied; empty rowset uses to_add.back() as prev_rowset
+    EXPECT_EQ(_tablet->max_version_unlocked(), 3);
+    auto& rowset_map = _tablet->rowset_map();
+    EXPECT_EQ(rowset_map.size(), 3);
+    EXPECT_TRUE(rowset_map.contains(Version(0, 1)));
+    EXPECT_TRUE(rowset_map.contains(Version(2, 2)));
+    EXPECT_TRUE(rowset_map.contains(Version(3, 3)));
+}
+
 } // namespace doris
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 15135eb0f60..ed5539d9388 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3346,6 +3346,9 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = true)
     public static long mow_get_ms_lock_retry_backoff_interval = 80;
 
+    @ConfField(mutable = true, masterOnly = true)
+    public static boolean enable_notify_be_after_load_txn_commit = false;
+
     // ATTN: DONOT add any config not related to cloud mode here
     // ATTN: DONOT add any config not related to cloud mode here
     // ATTN: DONOT add any config not related to cloud mode here
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 4fe2e9cc61a..043128d02fd 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -101,6 +101,7 @@ import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.CalcDeleteBitmapTask;
+import org.apache.doris.task.MakeCloudTmpRsVisibleTask;
 import org.apache.doris.thrift.TCalcDeleteBitmapPartitionInfo;
 import org.apache.doris.thrift.TStatus;
 import org.apache.doris.thrift.TStatusCode;
@@ -494,11 +495,20 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
 
     /**
      * Post process of commitTxn
-     * 1. update some stats
-     * 2. produce event for further processes like async MV
+     * 1. notify BEs to make temporary rowsets visible
+     * 2. update some stats
+     * 3. produce event for further processes like async MV
      * @param commitTxnResponse commit txn call response from meta-service
+     * @param tabletCommitInfos tablet commit infos containing backend and 
tablet mapping
      */
-    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse) {
+    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse, 
List<TabletCommitInfo> tabletCommitInfos) {
+        // ========================================
+        // notify BEs to make temporary rowsets visible
+        // ========================================
+        if (tabletCommitInfos != null) {
+            notifyBesMakeTmpRsVisible(commitTxnResponse, tabletCommitInfos);
+        }
+
         // ========================================
         // update some table stats
         // ========================================
@@ -723,11 +733,12 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
 
         final CommitTxnRequest commitTxnRequest = builder.build();
-        executeCommitTxnRequest(commitTxnRequest, transactionId, is2PC, 
txnCommitAttachment);
+        executeCommitTxnRequest(commitTxnRequest, transactionId, is2PC, 
txnCommitAttachment, tabletCommitInfos);
     }
 
     private void executeCommitTxnRequest(CommitTxnRequest commitTxnRequest, 
long transactionId, boolean is2PC,
-            TxnCommitAttachment txnCommitAttachment) throws UserException {
+            TxnCommitAttachment txnCommitAttachment, List<TabletCommitInfo> 
tabletCommitInfos)
+                throws UserException {
         if (DebugPointUtil.isEnable("FE.mow.commit.exception")) {
             LOG.info("debug point FE.mow.commit.exception, throw e");
             throw new UserException(InternalErrorCode.INTERNAL_ERR, "debug 
point FE.mow.commit.exception");
@@ -750,7 +761,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         StopWatch stopWatch = new StopWatch();
         stopWatch.start();
         try {
-            txnState = commitTxn(commitTxnRequest, transactionId, is2PC);
+            txnState = commitTxn(commitTxnRequest, transactionId, is2PC, 
tabletCommitInfos);
             txnOperated = true;
             if 
(DebugPointUtil.isEnable("CloudGlobalTransactionMgr.commitTransaction.timeout"))
 {
                 throw new 
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
@@ -789,8 +800,8 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
     }
 
-    private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long 
transactionId, boolean is2PC)
-            throws UserException {
+    private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long 
transactionId, boolean is2PC,
+            List<TabletCommitInfo> tabletCommitInfos) throws UserException {
         checkCommitInfo(commitTxnRequest);
 
         CommitTxnResponse commitTxnResponse = null;
@@ -850,7 +861,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
             MetricRepo.COUNTER_TXN_SUCCESS.increase(1L);
             MetricRepo.HISTO_TXN_EXEC_LATENCY.update(txnState.getCommitTime() 
- txnState.getPrepareTime());
         }
-        afterCommitTxnResp(commitTxnResponse);
+        afterCommitTxnResp(commitTxnResponse, tabletCommitInfos);
         return txnState;
     }
 
@@ -1639,7 +1650,7 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
 
         final CommitTxnRequest commitTxnRequest = builder.build();
-        executeCommitTxnRequest(commitTxnRequest, transactionId, false, null);
+        executeCommitTxnRequest(commitTxnRequest, transactionId, false, null, 
null);
     }
 
     private List<Table> getTablesNeedCommitLock(List<Table> tableList) {
@@ -2705,4 +2716,111 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     private void clearTxnLastSignature(long dbId, long txnId) {
         txnLastSignatureMap.remove(txnId);
     }
+
+    /**
+     * Notify BEs to make temporary cloud rowsets visible after transaction 
commit.
+     * This method is called in afterCommitTxnResp to notify BEs to promote
+     * the temporary rowsets from CloudCommittedRSMgr to tablet meta.
+     *
+     * @param commitTxnResponse commit txn response from meta-service
+     * @param tabletCommitInfos tablet commit infos containing backend and 
tablet mapping
+     */
+    private void notifyBesMakeTmpRsVisible(CommitTxnResponse commitTxnResponse,
+                                           List<TabletCommitInfo> 
tabletCommitInfos) {
+        if (tabletCommitInfos == null || tabletCommitInfos.isEmpty()
+                || !Config.enable_notify_be_after_load_txn_commit) {
+            return;
+        }
+        long txnId = commitTxnResponse.getTxnInfo().getTxnId();
+        if (DebugPointUtil.isEnable("notifyBesMakeTmpRsVisible.skip")) {
+            LOG.info("skip sendMakeCloudTmpRsVisibleTasks, txn_id: {}", txnId);
+            return;
+        }
+
+        try {
+            // Convert TabletCommitInfo to TTabletCommitInfo
+            List<TTabletCommitInfo> tTabletCommitInfos = Lists.newArrayList();
+            for (TabletCommitInfo commitInfo : tabletCommitInfos) {
+                TTabletCommitInfo tCommitInfo = new TTabletCommitInfo();
+                tCommitInfo.setTabletId(commitInfo.getTabletId());
+                tCommitInfo.setBackendId(commitInfo.getBackendId());
+                tTabletCommitInfos.add(tCommitInfo);
+            }
+
+            // Build partition version map from commit response
+            Map<Long, Long> partitionVersionMap = Maps.newHashMap();
+            int totalPartitionNum = 
commitTxnResponse.getPartitionIdsList().size();
+            for (int idx = 0; idx < totalPartitionNum; ++idx) {
+                long partitionId = commitTxnResponse.getPartitionIds(idx);
+                long version = commitTxnResponse.getVersions(idx);
+                partitionVersionMap.put(partitionId, version);
+            }
+
+            long updateVersionVisibleTime = 
commitTxnResponse.getVersionUpdateTimeMs();
+
+            // Send tasks to notify BEs
+            sendMakeCloudTmpRsVisibleTasks(txnId, tTabletCommitInfos,
+                    partitionVersionMap, updateVersionVisibleTime);
+        } catch (Throwable t) {
+            // According to normal logic, no exceptions will be thrown,
+            // but in order to avoid bugs affecting the original logic, all 
exceptions are caught
+            LOG.warn("notifyBesMakeTmpRsVisible failed, txn_id: {}",
+                    commitTxnResponse.getTxnInfo().getTxnId(), t);
+        }
+    }
+
+    /**
+     * Send agent tasks to notify BEs to make temporary cloud committed 
rowsets visible.
+     * This is called after transaction commit to MS, to notify BEs to promote
+     * rowset meta from CloudCommittedRSMgr to tablet meta.
+     *
+     * just send notify rpc with best effort, no need to retry or guarantee 
all BEs receive the rpc.
+     * @param txnId transaction id
+     * @param commitInfos tablet commit infos containing backend and tablet 
mapping
+     * @param partitionVersionMap partition id to version mapping
+     * @param updateVersionVisibleTime visible time for the version
+     */
+    public void sendMakeCloudTmpRsVisibleTasks(long txnId,
+                                               List<TTabletCommitInfo> 
commitInfos,
+                                               Map<Long, Long> 
partitionVersionMap,
+                                               long updateVersionVisibleTime) {
+        if (commitInfos == null || commitInfos.isEmpty()) {
+            LOG.info("no commit infos to send make cloud tmp rs visible tasks, 
txn_id: {}", txnId);
+            return;
+        }
+
+        // Group tablet_ids by backend_id
+        Map<Long, List<Long>> beToTabletIds = Maps.newHashMap();
+        for (TTabletCommitInfo commitInfo : commitInfos) {
+            long backendId = commitInfo.getBackendId();
+            long tabletId = commitInfo.getTabletId();
+            beToTabletIds.computeIfAbsent(backendId, k -> 
Lists.newArrayList()).add(tabletId);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("send make cloud tmp rs visible tasks, txn_id: {}, 
backend_count: {}, total_tablets: {}",
+                    txnId, beToTabletIds.size(), commitInfos.size());
+        }
+
+        // Create agent tasks for each BE
+        AgentBatchTask batchTask = new AgentBatchTask();
+        for (Map.Entry<Long, List<Long>> entry : beToTabletIds.entrySet()) {
+            long backendId = entry.getKey();
+            List<Long> tabletIds = entry.getValue();
+
+            MakeCloudTmpRsVisibleTask task = new MakeCloudTmpRsVisibleTask(
+                    backendId, txnId, tabletIds, partitionVersionMap, 
updateVersionVisibleTime);
+            batchTask.addTask(task);
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("add make cloud tmp rs visible task, txn_id: {}, 
backend_id: {}, tablet_count: {}",
+                        txnId, backendId, tabletIds.size());
+            }
+        }
+
+        // Submit tasks
+        AgentTaskExecutor.submit(batchTask);
+        LOG.info("sent make cloud tmp rs visible tasks, txn_id: {}, 
backend_count: {}, total_tablets: {}",
+                txnId, beToTabletIds.size(), commitInfos.size());
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index c537c42c4c7..56040809b5b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -5098,7 +5098,7 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
                 return new TStatus(TStatusCode.INVALID_ARGUMENT);
             }
             CommitTxnResponse commitTxnResponse = 
CommitTxnResponse.parseFrom(receivedProtobufBytes);
-            
Env.getCurrentGlobalTransactionMgr().afterCommitTxnResp(commitTxnResponse);
+            
Env.getCurrentGlobalTransactionMgr().afterCommitTxnResp(commitTxnResponse, 
null);
         } catch (InvalidProtocolBufferException e) {
             // Handle the exception, log it, or take appropriate action
             e.printStackTrace();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index 6ae8c9014bb..6b0928eb49d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -41,6 +41,7 @@ import org.apache.doris.thrift.TCreateTabletReq;
 import org.apache.doris.thrift.TDownloadReq;
 import org.apache.doris.thrift.TDropTabletReq;
 import org.apache.doris.thrift.TGcBinlogReq;
+import org.apache.doris.thrift.TMakeCloudTmpRsVisibleRequest;
 import org.apache.doris.thrift.TMoveDirReq;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPublishVersionRequest;
@@ -499,6 +500,15 @@ public class AgentBatchTask implements Runnable {
                 tAgentTaskRequest.setCleanUdfCacheReq(request);
                 return tAgentTaskRequest;
             }
+            case MAKE_CLOUD_COMMITTED_RS_VISIBLE: {
+                MakeCloudTmpRsVisibleTask makeCloudTmpRsVisibleTask = 
(MakeCloudTmpRsVisibleTask) task;
+                TMakeCloudTmpRsVisibleRequest request = 
makeCloudTmpRsVisibleTask.toThrift();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(request.toString());
+                }
+                tAgentTaskRequest.setMakeCloudTmpRsVisibleReq(request);
+                return tAgentTaskRequest;
+            }
             default:
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("could not find task type for task [{}]", task);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/MakeCloudTmpRsVisibleTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/MakeCloudTmpRsVisibleTask.java
new file mode 100644
index 00000000000..62626c90ba5
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/task/MakeCloudTmpRsVisibleTask.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.thrift.TMakeCloudTmpRsVisibleRequest;
+import org.apache.doris.thrift.TTaskType;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Task to notify BE to make committed cloud rowsets visible.
+ * After FE commits a transaction to MS, this task notifies BE to promote
+ * the committed rowsets in BE's memory to tablet meta.
+ */
+public class MakeCloudTmpRsVisibleTask extends AgentTask {
+    private final long txnId;
+    private final List<Long> tabletIds; // tablets on this BE involved in the 
transaction
+    private final Map<Long, Long> partitionVersionMap; // partition_id -> 
version
+    private final long updateVersionVisibleTime;
+
+    public MakeCloudTmpRsVisibleTask(long backendId, long txnId,
+                                     List<Long> tabletIds,
+                                     Map<Long, Long> partitionVersionMap,
+                                     long updateVersionVisibleTime) {
+        super(null, backendId, TTaskType.MAKE_CLOUD_COMMITTED_RS_VISIBLE,
+                -1L, -1L, -1L, -1L, -1L, txnId, System.currentTimeMillis());
+        this.txnId = txnId;
+        this.tabletIds = tabletIds;
+        this.partitionVersionMap = partitionVersionMap;
+        this.updateVersionVisibleTime = updateVersionVisibleTime;
+    }
+
+    public long getTxnId() {
+        return txnId;
+    }
+
+    public List<Long> getTabletIds() {
+        return tabletIds;
+    }
+
+    public Map<Long, Long> getPartitionVersionMap() {
+        return partitionVersionMap;
+    }
+
+    public long getUpdateVersionVisibleTime() {
+        return updateVersionVisibleTime;
+    }
+
+    public TMakeCloudTmpRsVisibleRequest toThrift() {
+        TMakeCloudTmpRsVisibleRequest request = new 
TMakeCloudTmpRsVisibleRequest();
+        request.setTxnId(txnId);
+        request.setTabletIds(tabletIds);
+        request.setPartitionVersionMap(partitionVersionMap);
+        request.setVersionUpdateTimeMs(updateVersionVisibleTime);
+        return request;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 614448891a2..b5e30e9893d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -230,7 +230,7 @@ public class GlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     }
 
     @Override
-    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse) {
+    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse, 
List<TabletCommitInfo> tabletCommitInfos) {
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
index d56193fe683..b611ff4e588 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgrIface.java
@@ -210,7 +210,7 @@ public interface GlobalTransactionMgrIface extends Writable 
{
 
     public void 
replayBatchRemoveTransactionV2(BatchRemoveTransactionsOperationV2 operation) 
throws Exception;
 
-    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse);
+    public void afterCommitTxnResp(CommitTxnResponse commitTxnResponse, 
List<TabletCommitInfo> tabletCommitInfos);
 
     public void addSubTransaction(long dbId, long transactionId, long 
subTransactionId);
 
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index 51bd59e7ae7..c8c65513dc2 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -568,6 +568,14 @@ struct TPushCooldownConfReq {
     1: required list<TCooldownConf> cooldown_confs
 }
 
+// Request to make temporary cloud rowsets visible
+struct TMakeCloudTmpRsVisibleRequest {
+    1: required i64 txn_id
+    2: required list<Types.TTabletId> tablet_ids // tablets on this BE 
involved in the transaction
+    3: required map<Types.TPartitionId, Types.TVersion> partition_version_map
+    4: optional i64 version_update_time_ms
+}
+
 struct TAgentTaskRequest {
     1: required TAgentServiceVersion protocol_version
     2: required Types.TTaskType task_type
@@ -610,6 +618,7 @@ struct TAgentTaskRequest {
 
     // For cloud
     1000: optional TCalcDeleteBitmapRequest calc_delete_bitmap_req
+    1001: optional TMakeCloudTmpRsVisibleRequest make_cloud_tmp_rs_visible_req
 }
 
 struct TAgentResult {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 87be26403d6..c58148197aa 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -249,7 +249,8 @@ enum TTaskType {
     PUSH_INDEX_POLICY = 35,
 
     // CLOUD
-    CALCULATE_DELETE_BITMAP = 1000
+    CALCULATE_DELETE_BITMAP = 1000,
+    MAKE_CLOUD_COMMITTED_RS_VISIBLE = 1001
 }
 
 // level of verboseness for "explain" output
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_dup_forward_notify_be_after_txn_commit.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_dup_forward_notify_be_after_txn_commit.out
new file mode 100644
index 00000000000..086ffb310ef
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_dup_forward_notify_be_after_txn_commit.out
@@ -0,0 +1,20 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !1_1 --
+1      1       1
+
+-- !1_2 --
+1      1       1
+1      1       1
+1      1       1
+1      1       1
+
+-- !2_1 --
+1      1       1
+1      1       1
+1      1       1
+1      1       1
+1      1       1
+1      1       1
+1      1       1
+1      1       1
+
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_dup_notify_be_after_txn_commit.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_dup_notify_be_after_txn_commit.out
new file mode 100644
index 00000000000..7274597e33c
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_dup_notify_be_after_txn_commit.out
@@ -0,0 +1,36 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !1_1 --
+1      1       1
+2      1       1
+3      1       1
+
+-- !1_2 --
+1      1       1
+1      10      10
+1      20      20
+2      1       1
+2      20      20
+3      1       1
+3      30      30
+4      10      10
+5      20      20
+5      30      30
+6      30      30
+
+-- !2_1 --
+1      1       1
+1      10      10
+1      20      20
+2      1       1
+2      20      20
+3      1       1
+3      30      30
+4      10      10
+5      20      20
+5      30      30
+6      30      30
+100    100     100
+100    100     100
+100    100     100
+100    100     100
+
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_empty_rs_notify_be_after_txn_commit.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_empty_rs_notify_be_after_txn_commit.out
new file mode 100644
index 00000000000..aae048f27ee
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_empty_rs_notify_be_after_txn_commit.out
@@ -0,0 +1,38 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !1_1 --
+1      1       1
+
+-- !1_2 --
+1      1       1
+1      1       1
+2      2       2
+3      3       3
+3      3       3
+
+-- !2_1 --
+1      1       1
+1      1       1
+1      1       1
+1      1       1
+1      1       1
+1      1       1
+2      2       2
+2      2       2
+2      2       2
+3      3       3
+3      3       3
+3      3       3
+3      3       3
+3      3       3
+3      3       3
+3      3       3
+3      3       3
+
+-- !1_1 --
+1      1       1
+
+-- !1_2 --
+1      1       1
+2      2       2
+3      3       3
+
diff --git 
a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_notify_be_after_txn_commit.out
 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_notify_be_after_txn_commit.out
new file mode 100644
index 00000000000..fe055936213
--- /dev/null
+++ 
b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_notify_be_after_txn_commit.out
@@ -0,0 +1,14 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !1 --
+1      1       1
+2      1       1
+3      1       1
+
+-- !2 --
+1      20      20
+2      20      20
+3      30      30
+4      10      10
+5      30      30
+6      30      30
+
diff --git a/regression-test/pipeline/cloud_p0/conf/be_custom.conf 
b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
index a5da49e5604..01753481dc3 100644
--- a/regression-test/pipeline/cloud_p0/conf/be_custom.conf
+++ b/regression-test/pipeline/cloud_p0/conf/be_custom.conf
@@ -64,3 +64,6 @@ enable_python_udf_support=true
 python_env_mode=conda
 python_conda_root_path=/opt/miniconda3
 max_python_process_num=64
+
+enable_cloud_make_rs_visible_on_be=true
+cloud_mow_sync_rowsets_when_load_txn_begin=false
diff --git a/regression-test/pipeline/cloud_p0/conf/fe_custom.conf 
b/regression-test/pipeline/cloud_p0/conf/fe_custom.conf
index 704116ce6aa..601a7015223 100644
--- a/regression-test/pipeline/cloud_p0/conf/fe_custom.conf
+++ b/regression-test/pipeline/cloud_p0/conf/fe_custom.conf
@@ -46,4 +46,6 @@ workload_group_max_num = 25
 enable_advance_next_id = true
 
 check_table_lock_leaky = true
-enable_outfile_to_local=true
\ No newline at end of file
+enable_outfile_to_local=true
+
+enable_notify_be_after_load_txn_commit=true
\ No newline at end of file
diff --git a/regression-test/pipeline/cloud_p1/conf/be_custom.conf 
b/regression-test/pipeline/cloud_p1/conf/be_custom.conf
index f8bdabcb15f..0b9d27e98a7 100644
--- a/regression-test/pipeline/cloud_p1/conf/be_custom.conf
+++ b/regression-test/pipeline/cloud_p1/conf/be_custom.conf
@@ -46,3 +46,6 @@ enable_python_udf_support=true
 python_env_mode=conda
 python_conda_root_path=/opt/miniconda3
 max_python_process_num=64
+
+enable_cloud_make_rs_visible_on_be=true
+cloud_mow_sync_rowsets_when_load_txn_begin=false
diff --git a/regression-test/pipeline/cloud_p1/conf/fe_custom.conf 
b/regression-test/pipeline/cloud_p1/conf/fe_custom.conf
index b91a4ed6d38..157f3a07a09 100644
--- a/regression-test/pipeline/cloud_p1/conf/fe_custom.conf
+++ b/regression-test/pipeline/cloud_p1/conf/fe_custom.conf
@@ -37,3 +37,5 @@ enable_advance_next_id = true
 
 arrow_flight_sql_port = 8081
 enable_job_schedule_second_for_test = true
+
+enable_notify_be_after_load_txn_commit=true
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_dup_forward_notify_be_after_txn_commit.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_dup_forward_notify_be_after_txn_commit.groovy
new file mode 100644
index 00000000000..ba016c7f05a
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_dup_forward_notify_be_after_txn_commit.groovy
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_dup_forward_notify_be_after_txn_commit", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+
+    GetDebugPoint().clearDebugPointsForAllFEs()
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    def getTabletAndBackend = { def tableName ->
+        def backends = sql_return_maparray('show backends')
+        def tabletStats = sql_return_maparray("show tablets from 
${tableName};")
+        assert tabletStats.size() == 1
+        def tabletId = tabletStats[0].TabletId
+        def tabletBackendId = tabletStats[0].BackendId
+        def tabletBackend
+        for (def be : backends) {
+            if (be.BackendId == tabletBackendId) {
+                tabletBackend = be
+                break;
+            }
+        }
+        logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with 
backendId=${tabletBackend.BackendId}");
+        return [tabletId, tabletBackend]
+    }
+
+    def customFeConfig = [
+        enable_notify_be_after_load_txn_commit: true
+    ]
+    def customBeConfig = [
+        enable_cloud_make_rs_visible_on_be : true,
+        cloud_mow_sync_rowsets_when_load_txn_begin : false,
+        enable_stream_load_commit_txn_on_be : true // commit txn to MS 
directly on BE
+    ]
+
+    def getTabletRowsets = {def tableName ->
+        def tablets = sql_return_maparray """ show tablets from ${tableName}; 
"""
+        assert tablets.size() == 1
+        String compactionUrl = tablets[0]["CompactionStatus"]
+        def (code, out, err) = curl("GET", compactionUrl)
+        def tabletJson = parseJson(out.trim())
+        assert tabletJson.rowsets instanceof List
+        return tabletJson.rowsets
+    }
+
+    def executeStreamLoad = { def tableName -> 
+        String data = """1,1,1"""
+        streamLoad {
+            table "${tableName}"
+            set 'column_separator', ','
+            set 'format', 'csv'
+            inputStream new ByteArrayInputStream(data.getBytes())
+            time 10000
+        }
+    }
+
+    setFeConfigTemporary(customFeConfig) {
+        setBeConfigTemporary(customBeConfig) {
+            try {
+                def table1 = 
"test_cloud_dup_forward_notify_be_after_txn_commit"
+                sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+                sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+                            `k1` int NOT NULL,
+                            `c1` int,
+                            `c2` int
+                            )duplicate KEY(k1)
+                        DISTRIBUTED BY HASH(k1) BUCKETS 1
+                        PROPERTIES (
+                            "disable_auto_compaction" = "true",
+                            "replication_num" = "1"); """
+                def (tabletId, tabletBackend) = getTabletAndBackend(table1)
+
+                executeStreamLoad(table1) // ver=2
+                qt_1_1 "select * from ${table1} order by k1;"
+
+                
GetDebugPoint().enableDebugPointForAllFEs("sendMakeCloudTmpRsVisibleTasks.skip");
+                // inject error to ordinary sync_rowsets calls
+                
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_rowsets.before.inject_error",
 ["tablet_id": tabletId])
+
+                // 1. test that after turn on the notify feature, rowsets will 
be visible on BE without sync_rowsets
+                executeStreamLoad(table1) // ver=3
+                executeStreamLoad(table1) // ver=4
+                executeStreamLoad(table1) // ver=5
+                sleep(500)
+                assert getTabletRowsets(table1).size() == 5
+                qt_1_2 "select * from ${table1} order by k1;"
+                assert getTabletRowsets(table1).size() == 5
+
+                // 2. test the notify rpc arrived not in order
+                // block the notify rpc for version 8
+                
GetDebugPoint().enableDebugPointForAllBEs("make_cloud_committed_rs_visible_callback.block",
 ["tablet_id": tabletId, "version": 8])
+                executeStreamLoad(table1) // ver=6
+                executeStreamLoad(table1) //ver=7
+                sleep(500)
+                assert getTabletRowsets(table1).size() == 7
+                executeStreamLoad(table1) // ver=8
+                executeStreamLoad(table1) // ver=9
+                // due the miss of rowset of version 8, version 8 and version 
9 will not be added to BE's tablet meta
+                sleep(500)
+                assert getTabletRowsets(table1).size() == 7
+                
GetDebugPoint().disableDebugPointForAllBEs("make_cloud_committed_rs_visible_callback.block")
+                sleep(500)
+                assert getTabletRowsets(table1).size() == 9
+                qt_2_1 "select * from ${table1} order by k1;"
+
+            } catch (Exception e) {
+                logger.info(e.getMessage())
+                throw e
+            } finally {
+                GetDebugPoint().clearDebugPointsForAllFEs()
+                GetDebugPoint().clearDebugPointsForAllBEs()
+            }
+        }
+    }
+}
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_dup_notify_be_after_txn_commit.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_dup_notify_be_after_txn_commit.groovy
new file mode 100644
index 00000000000..ad3ad1a0766
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_dup_notify_be_after_txn_commit.groovy
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_dup_notify_be_after_txn_commit", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+
+    GetDebugPoint().clearDebugPointsForAllFEs()
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    def getTabletAndBackend = { def tableName ->
+        def backends = sql_return_maparray('show backends')
+        def tabletStats = sql_return_maparray("show tablets from 
${tableName};")
+        assert tabletStats.size() == 1
+        def tabletId = tabletStats[0].TabletId
+        def tabletBackendId = tabletStats[0].BackendId
+        def tabletBackend
+        for (def be : backends) {
+            if (be.BackendId == tabletBackendId) {
+                tabletBackend = be
+                break;
+            }
+        }
+        logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with 
backendId=${tabletBackend.BackendId}");
+        return [tabletId, tabletBackend]
+    }
+
+    def feConfig1 = [
+        enable_notify_be_after_load_txn_commit: false
+    ]
+    def beConfig1 = [
+        enable_cloud_make_rs_visible_on_be : false,
+        cloud_mow_sync_rowsets_when_load_txn_begin : false
+    ]
+    // 0. test the injected error in sync_rowsets will cause the query failure
+    setFeConfigTemporary(feConfig1) {
+        setBeConfigTemporary(beConfig1) {
+            try {
+                def table1 = "test_cloud_dup_notify_be_after_txn_commit_error"
+                sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+                sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+                            `k1` int NOT NULL,
+                            `c1` int,
+                            `c2` int
+                            )duplicate KEY(k1)
+                        DISTRIBUTED BY HASH(k1) BUCKETS 1
+                        PROPERTIES (
+                            "disable_auto_compaction" = "true",
+                            "replication_num" = "1"); """
+                def (tabletId, tabletBackend) = getTabletAndBackend(table1)
+                sql "insert into ${table1} values(1,1,1),(2,1,1),(3,1,1);"
+
+                // inject error to ordinary sync_rowsets calls
+                
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_rowsets.before.inject_error",
 ["tablet_id": tabletId])
+                // ensure that the injected error will cause the query failure
+                test {
+                    sql "select * from ${table1} order by k1;"
+                    exception "[sync_tablet_rowsets_unlocked] injected error 
for testing"
+                }
+
+            } catch (Exception e) {
+                logger.info(e.getMessage())
+                throw e
+            } finally {
+                GetDebugPoint().clearDebugPointsForAllFEs()
+                GetDebugPoint().clearDebugPointsForAllBEs()
+            }
+        }
+    }
+
+    def customFeConfig = [
+        enable_notify_be_after_load_txn_commit: true
+    ]
+    def customBeConfig = [
+        enable_cloud_make_rs_visible_on_be : true,
+        cloud_mow_sync_rowsets_when_load_txn_begin : false
+    ]
+
+    def getTabletRowsets = {def tableName ->
+        def tablets = sql_return_maparray """ show tablets from ${tableName}; 
"""
+        assert tablets.size() == 1
+        String compactionUrl = tablets[0]["CompactionStatus"]
+        def (code, out, err) = curl("GET", compactionUrl)
+        def tabletJson = parseJson(out.trim())
+        assert tabletJson.rowsets instanceof List
+        return tabletJson.rowsets
+    }
+
+    setFeConfigTemporary(customFeConfig) {
+        setBeConfigTemporary(customBeConfig) {
+            try {
+                def table1 = "test_cloud_dup_notify_be_after_txn_commit"
+                sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+                sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+                            `k1` int NOT NULL,
+                            `c1` int,
+                            `c2` int
+                            )duplicate KEY(k1)
+                        DISTRIBUTED BY HASH(k1) BUCKETS 1
+                        PROPERTIES (
+                            "disable_auto_compaction" = "true",
+                            "replication_num" = "1"); """
+                def (tabletId, tabletBackend) = getTabletAndBackend(table1)
+
+                sql "insert into ${table1} values(1,1,1),(2,1,1),(3,1,1);" // 
ver=2
+                qt_1_1 "select * from ${table1} order by k1;"
+
+                // inject error to ordinary sync_rowsets calls
+                
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_rowsets.before.inject_error",
 ["tablet_id": tabletId])
+
+                // 1. test that after turn on the notify feature, rowsets will 
be visible on BE without sync_rowsets
+                sql "insert into ${table1} values(1,10,10),(4,10,10);" // ver=3
+                sql "insert into ${table1} 
values(2,20,20),(5,20,20),(1,20,20);" // ver=4
+                sql "insert into ${table1} 
values(3,30,30),(6,30,30),(5,30,30);" // ver=5
+                sleep(500)
+                assert getTabletRowsets(table1).size() == 5
+                qt_1_2 "select * from ${table1} order by k1;"
+                assert getTabletRowsets(table1).size() == 5
+
+                // 2. test the notify rpc arrived not in order
+                // block the notify rpc for version 8
+                
GetDebugPoint().enableDebugPointForAllBEs("make_cloud_committed_rs_visible_callback.block",
 ["tablet_id": tabletId, "version": 8])
+                sql "insert into ${table1} values(100,100,100);" // ver=6
+                sql "insert into ${table1} values(100,100,100);" //ver=7
+                sleep(500)
+                assert getTabletRowsets(table1).size() == 7
+                sql "insert into ${table1} values(100,100,100);" // ver=8
+                sql "insert into ${table1} values(100,100,100);" // ver=9
+                // due the miss of rowset of version 8, version 8 and version 
9 will not be added to BE's tablet meta
+                sleep(500)
+                assert getTabletRowsets(table1).size() == 7
+                
GetDebugPoint().disableDebugPointForAllBEs("make_cloud_committed_rs_visible_callback.block")
+                sleep(500)
+                assert getTabletRowsets(table1).size() == 9
+                qt_2_1 "select * from ${table1} order by k1;"
+
+            } catch (Exception e) {
+                logger.info(e.getMessage())
+                throw e
+            } finally {
+                GetDebugPoint().clearDebugPointsForAllFEs()
+                GetDebugPoint().clearDebugPointsForAllBEs()
+            }
+        }
+    }
+}
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_empty_rs_notify_be_after_txn_commit.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_empty_rs_notify_be_after_txn_commit.groovy
new file mode 100644
index 00000000000..754c2affd82
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_empty_rs_notify_be_after_txn_commit.groovy
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_empty_rs_notify_be_after_txn_commit", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+
+    GetDebugPoint().clearDebugPointsForAllFEs()
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    def getTabletAndBackend = { def tableName, int index ->
+        def backends = sql_return_maparray('show backends')
+        def tabletStats = sql_return_maparray("show tablets from 
${tableName};")
+        assert tabletStats.size() > index
+        def tabletId = tabletStats[index].TabletId
+        def tabletBackendId = tabletStats[index].BackendId
+        def tabletBackend
+        for (def be : backends) {
+            if (be.BackendId == tabletBackendId) {
+                tabletBackend = be
+                break;
+            }
+        }
+        logger.info("tablet ${tabletId},index=${index} on backend 
${tabletBackend.Host} with backendId=${tabletBackend.BackendId}");
+        return [tabletId, tabletBackend]
+    }
+
+    def getTableId = {def tabletId ->
+        def info = sql_return_maparray """ show tablet ${tabletId}; """
+        assert info.size() == 1
+        return info[0]["TableId"]
+    }
+
+    def customFeConfig = [
+        enable_notify_be_after_load_txn_commit: true
+    ]
+    def customBeConfig = [
+        enable_cloud_make_rs_visible_on_be : true,
+        cloud_mow_sync_rowsets_when_load_txn_begin : false,
+        skip_writing_empty_rowset_metadata : true // empty rowset opt
+    ]
+
+    def getTabletRowsets = {def tabletId ->
+        def info = sql_return_maparray """ show tablet ${tabletId}; """
+        assert info.size() == 1
+        def detail = sql_return_maparray """${info[0]["DetailCmd"]}"""
+        assert detail instanceof List
+        assert detail.size() == 1
+        def compactionUrl = detail[0]["CompactionStatus"]
+        def (code, out, err) = curl("GET", compactionUrl)
+        def tabletJson = parseJson(out.trim())
+        assert tabletJson.rowsets instanceof List
+        return tabletJson.rowsets
+    }
+
+    // duplicate table
+    setFeConfigTemporary(customFeConfig) {
+        setBeConfigTemporary(customBeConfig) {
+            try {
+                def table1 = 
"test_cloud_dup_empty_rs_notify_be_after_txn_commit"
+                sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+                sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+                            `k1` int NOT NULL,
+                            `c1` int,
+                            `c2` int
+                            )duplicate KEY(k1)
+                        DISTRIBUTED BY HASH(k1) BUCKETS 2
+                        PROPERTIES (
+                            "disable_auto_compaction" = "true",
+                            "replication_num" = "1"); """
+                def (tablet1Id, tablet1Backend) = getTabletAndBackend(table1, 
0)
+                def (tablet2Id, tablet2Backend) = getTabletAndBackend(table1, 
1)
+
+                def tableId = getTableId(tablet1Id)
+
+                sql "insert into ${table1} values(1,1,1);" // ver=2
+                qt_1_1 "select * from ${table1} order by k1;"
+
+                // inject error to ordinary sync_rowsets calls
+                
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_rowsets.before.inject_error",
 ["table_id": tableId])
+
+                // 1. test that after turn on the notify feature, rowsets will 
be visible on BE without sync_rowsets
+                sql "insert into ${table1} values(1,1,1);" // ver=3
+                sql "insert into ${table1} values(2,2,2);" // ver=4
+                sql "insert into ${table1} values(3,3,3);" // ver=5
+                sql "insert into ${table1} values(3,3,3);" // ver=6
+                sleep(500)
+                assert getTabletRowsets(tablet1Id).size() == 6
+                assert getTabletRowsets(tablet2Id).size() == 6
+                qt_1_2 "select * from ${table1} order by k1;"
+                assert getTabletRowsets(tablet1Id).size() == 6
+                assert getTabletRowsets(tablet2Id).size() == 6
+
+
+                // 2. test the notify rpc arrived not in order
+                
GetDebugPoint().enableDebugPointForAllBEs("make_cloud_committed_rs_visible_callback.block",
 ["table_id": tableId, "version": 7])
+                sql "insert into ${table1} values(1,1,1);"
+                sql "insert into ${table1} values(1,1,1);"
+                sql "insert into ${table1} values(3,3,3);"
+                sql "insert into ${table1} values(3,3,3);"
+                sql "insert into ${table1} values(1,1,1);"
+                sql "insert into ${table1} values(1,1,1);"
+                sql "insert into ${table1} values(3,3,3);"
+                sql "insert into ${table1} values(3,3,3);"
+                sql "insert into ${table1} values(2,2,2);"
+                sql "insert into ${table1} values(2,2,2);"
+                sql "insert into ${table1} values(3,3,3);"
+                sql "insert into ${table1} values(3,3,3);"
+                sleep(500)
+                assert getTabletRowsets(tablet1Id).size() == 6
+                assert getTabletRowsets(tablet2Id).size() == 6
+                
GetDebugPoint().disableDebugPointForAllBEs("make_cloud_committed_rs_visible_callback.block")
+                sleep(500)
+                assert getTabletRowsets(tablet1Id).size() == 18
+                assert getTabletRowsets(tablet2Id).size() == 18
+                qt_2_1 "select * from ${table1} order by k1;"
+
+            } catch (Exception e) {
+                logger.info(e.getMessage())
+                throw e
+            } finally {
+                GetDebugPoint().clearDebugPointsForAllFEs()
+                GetDebugPoint().clearDebugPointsForAllBEs()
+            }
+        }
+    }
+
+
+    // mow table
+    setFeConfigTemporary(customFeConfig) {
+        setBeConfigTemporary(customBeConfig) {
+            try {
+                def table1 = 
"test_cloud_mow_empty_rs_notify_be_after_txn_commit"
+                sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+                sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+                            `k1` int NOT NULL,
+                            `c1` int,
+                            `c2` int
+                            )unique KEY(k1)
+                        DISTRIBUTED BY HASH(k1) BUCKETS 2
+                        PROPERTIES (
+                            "disable_auto_compaction" = "true",
+                            "enable_unique_key_merge_on_write" = "true",
+                            "replication_num" = "1"); """
+                def (tablet1Id, tablet1Backend) = getTabletAndBackend(table1, 
0)
+                def (tablet2Id, tablet2Backend) = getTabletAndBackend(table1, 
1)
+
+                def tableId = getTableId(tablet1Id)
+
+                sql "insert into ${table1} values(1,1,1);" // ver=2
+                qt_1_1 "select * from ${table1} order by k1;"
+
+                // inject error to ordinary sync_rowsets calls
+                
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_rowsets.before.inject_error",
 ["table_id": tableId])
+
+                // 1. test that after turn on the notify feature, rowsets will 
be visible on BE without sync_rowsets
+                sql "insert into ${table1} values(1,1,1);" // ver=3
+                sql "insert into ${table1} values(2,2,2);" // ver=4
+                sql "insert into ${table1} values(3,3,3);" // ver=5
+                sql "insert into ${table1} values(3,3,3);" // ver=6
+                sleep(500)
+                assert getTabletRowsets(tablet1Id).size() == 6
+                assert getTabletRowsets(tablet2Id).size() == 6
+                qt_1_2 "select * from ${table1} order by k1;"
+                assert getTabletRowsets(tablet1Id).size() == 6
+                assert getTabletRowsets(tablet2Id).size() == 6
+            } catch (Exception e) {
+                logger.info(e.getMessage())
+                throw e
+            } finally {
+                GetDebugPoint().clearDebugPointsForAllFEs()
+                GetDebugPoint().clearDebugPointsForAllBEs()
+            }
+        }
+    }
+}
diff --git 
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_notify_be_after_txn_commit.groovy
 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_notify_be_after_txn_commit.groovy
new file mode 100644
index 00000000000..3b3754d330b
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_notify_be_after_txn_commit.groovy
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_cloud_mow_notify_be_after_txn_commit", "nonConcurrent") {
+    if (!isCloudMode()) {
+        return
+    }
+
+    GetDebugPoint().clearDebugPointsForAllFEs()
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    def customFeConfig = [
+        enable_notify_be_after_load_txn_commit: true
+    ]
+    def customBeConfig = [
+        enable_cloud_make_rs_visible_on_be : true,
+        cloud_mow_sync_rowsets_when_load_txn_begin : false
+    ]
+
+    setFeConfigTemporary(customFeConfig) {
+        setBeConfigTemporary(customBeConfig) {
+            try {
+                def table1 = "test_cloud_mow_notify_be_after_txn_commit"
+                sql "DROP TABLE IF EXISTS ${table1} FORCE;"
+                sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+                            `k1` int NOT NULL,
+                            `c1` int,
+                            `c2` int
+                            )UNIQUE KEY(k1)
+                        DISTRIBUTED BY HASH(k1) BUCKETS 1
+                        PROPERTIES (
+                            "enable_unique_key_merge_on_write" = "true",
+                            "disable_auto_compaction" = "true",
+                            "replication_num" = "1"); """
+                def backends = sql_return_maparray('show backends')
+                def tabletStats = sql_return_maparray("show tablets from 
${table1};")
+                assert tabletStats.size() == 1
+                def tabletId = tabletStats[0].TabletId
+                def tabletBackendId = tabletStats[0].BackendId
+                def tabletBackend
+                for (def be : backends) {
+                    if (be.BackendId == tabletBackendId) {
+                        tabletBackend = be
+                        break;
+                    }
+                }
+                logger.info("tablet ${tabletId} on backend 
${tabletBackend.Host} with backendId=${tabletBackend.BackendId}");
+
+                sql "insert into ${table1} values(1,1,1),(2,1,1),(3,1,1);"
+                qt_1 "select * from ${table1} order by k1;"
+
+                // inject error to ordinary sync_rowsets calls
+                
GetDebugPoint().enableDebugPointForAllBEs("CloudMetaMgr::sync_tablet_rowsets.before.inject_error",
 ["tablet_id": tabletId])
+
+                sql "insert into ${table1} values(1,10,10),(4,10,10);"
+                sql "insert into ${table1} 
values(2,20,20),(5,20,20),(1,20,20);"
+                sql "insert into ${table1} 
values(3,30,30),(6,30,30),(5,30,30);"
+
+                qt_2 "select * from ${table1} order by k1;"
+
+            } catch (Exception e) {
+                logger.info(e.getMessage())
+                throw e
+            } finally {
+                GetDebugPoint().clearDebugPointsForAllFEs()
+                GetDebugPoint().clearDebugPointsForAllBEs()
+            }
+        }
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to