This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 09609dd2db8 [feature](merge-cloud) support MoW table in cloud mode 
(#31317)
09609dd2db8 is described below

commit 09609dd2db8c0ffd40c9d79087e853d7aefbad67
Author: Xin Liao <[email protected]>
AuthorDate: Sat Mar 2 08:36:53 2024 +0800

    [feature](merge-cloud) support MoW table in cloud mode (#31317)
---
 be/src/agent/agent_server.cpp                      |   5 +-
 be/src/agent/task_worker_pool.cpp                  |  32 +++
 be/src/agent/task_worker_pool.h                    |   2 +
 be/src/cloud/cloud_delete_task.cpp                 |  10 +-
 be/src/cloud/cloud_delta_writer.cpp                |   4 +
 be/src/cloud/cloud_delta_writer.h                  |   2 +
 .../cloud/cloud_engine_calc_delete_bitmap_task.cpp | 168 +++++++++++++
 .../cloud/cloud_engine_calc_delete_bitmap_task.h   |  72 ++++++
 be/src/cloud/cloud_rowset_builder.cpp              |  30 ++-
 be/src/cloud/cloud_rowset_builder.h                |   2 +
 be/src/cloud/cloud_rowset_writer.cpp               |   5 -
 be/src/cloud/cloud_rowset_writer.h                 |   3 -
 be/src/cloud/cloud_storage_engine.cpp              |  11 +
 be/src/cloud/cloud_storage_engine.h                |   9 +
 be/src/cloud/cloud_tablet.cpp                      |  67 +++++
 be/src/cloud/cloud_tablet.h                        |  13 +
 be/src/cloud/cloud_tablets_channel.cpp             |  11 +-
 be/src/cloud/cloud_txn_delete_bitmap_cache.cpp     | 190 ++++++++++++++
 be/src/cloud/cloud_txn_delete_bitmap_cache.h       |  95 +++++++
 be/src/common/config.cpp                           |   4 +
 be/src/common/config.h                             |   4 +
 be/src/olap/base_tablet.cpp                        | 201 +++++++++++++++
 be/src/olap/base_tablet.h                          |  18 ++
 be/src/olap/rowset/beta_rowset_writer.cpp          |   2 +-
 be/src/olap/rowset/beta_rowset_writer.h            |   5 +-
 be/src/olap/rowset/rowset.cpp                      |   4 +
 be/src/olap/rowset/rowset.h                        |   1 +
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |   5 -
 .../rowset/segment_v2/vertical_segment_writer.cpp  |   9 -
 be/src/olap/rowset_builder.cpp                     |   2 +-
 be/src/olap/rowset_builder.h                       |   4 +-
 be/src/olap/tablet.cpp                             | 211 ++--------------
 be/src/olap/tablet.h                               |  17 +-
 be/src/olap/tablet_fwd.h                           |   2 +
 be/src/runtime/memory/cache_policy.h               |   3 +
 .../org/apache/doris/analysis/CreateTableStmt.java |   5 -
 .../main/java/org/apache/doris/catalog/Table.java  |  30 ++-
 .../transaction/CloudGlobalTransactionMgr.java     | 278 ++++++++++++++++++++-
 .../org/apache/doris/common/InternalErrorCode.java |   5 +-
 .../apache/doris/common/util/MetaLockUtils.java    |  24 ++
 .../apache/doris/common/util/PropertyAnalyzer.java |   2 +-
 .../java/org/apache/doris/master/MasterImpl.java   |  31 ++-
 .../trees/plans/commands/info/CreateTableInfo.java |   6 -
 .../java/org/apache/doris/task/AgentBatchTask.java |  10 +
 .../apache/doris/task/CalcDeleteBitmapTask.java    | 119 +++++++++
 .../cloud_p0/conf/regression-conf-custom.groovy    |   6 +-
 46 files changed, 1478 insertions(+), 261 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 269ee490224..fc63c1ff837 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -193,12 +193,15 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& 
engine, ExecEnv* exec_
             "PUSH", config::delete_worker_count,
             [&engine](auto&& task) { cloud_push_callback(engine, task); });
     // TODO(plat1ko): SUBMIT_TABLE_COMPACTION worker
-    // TODO(plat1ko): CALCULATE_DELETE_BITMAP worker
 
     _workers[TTaskType::ALTER] = std::make_unique<TaskWorkerPool>(
             "ALTER_TABLE", config::alter_tablet_worker_count,
             [&engine](auto&& task) { return 
alter_cloud_tablet_callback(engine, task); });
 
+    _workers[TTaskType::CALCULATE_DELETE_BITMAP] = 
std::make_unique<TaskWorkerPool>(
+            "CALC_DBM_TASK", config::calc_delete_bitmap_worker_count,
+            [&engine](auto&& task) { return 
calc_delete_bimtap_callback(engine, task); });
+
     _report_workers.push_back(std::make_unique<ReportWorker>(
             "REPORT_TASK", _master_info, config::report_task_interval_seconds,
             [&master_info = _master_info] { report_task_callback(master_info); 
}));
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index c4fc258552b..af4eade1b3c 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -45,6 +45,7 @@
 
 #include "agent/utils.h"
 #include "cloud/cloud_delete_task.h"
+#include "cloud/cloud_engine_calc_delete_bitmap_task.h"
 #include "cloud/cloud_schema_change_job.h"
 #include "common/config.h"
 #include "common/logging.h"
@@ -1864,4 +1865,35 @@ void storage_medium_migrate_callback(StorageEngine& 
engine, const TAgentTaskRequ
     remove_task_info(req.task_type, req.signature);
 }
 
+void calc_delete_bimtap_callback(CloudStorageEngine& engine, const 
TAgentTaskRequest& req) {
+    std::vector<TTabletId> error_tablet_ids;
+    std::vector<TTabletId> succ_tablet_ids;
+    Status status;
+    error_tablet_ids.clear();
+    const auto& calc_delete_bitmap_req = req.calc_delete_bitmap_req;
+    CloudEngineCalcDeleteBitmapTask engine_task(engine, 
calc_delete_bitmap_req, &error_tablet_ids,
+                                                &succ_tablet_ids);
+    status = engine_task.execute();
+
+    TFinishTaskRequest finish_task_request;
+    if (!status) {
+        DorisMetrics::instance()->publish_task_failed_total->increment(1);
+        LOG_WARNING("failed to calculate delete bitmap")
+                .tag("signature", req.signature)
+                .tag("transaction_id", calc_delete_bitmap_req.transaction_id)
+                .tag("error_tablets_num", error_tablet_ids.size())
+                .error(status);
+    }
+
+    status.to_thrift(&finish_task_request.task_status);
+    finish_task_request.__set_backend(BackendOptions::get_local_backend());
+    finish_task_request.__set_task_type(req.task_type);
+    finish_task_request.__set_signature(req.signature);
+    finish_task_request.__set_report_version(s_report_version);
+    finish_task_request.__set_error_tablet_ids(error_tablet_ids);
+
+    finish_task(finish_task_request);
+    remove_task_info(req.task_type, req.signature);
+}
+
 } // namespace doris
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 531a64655a1..5c546582576 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -180,4 +180,6 @@ void report_disk_callback(StorageEngine& engine, const 
TMasterInfo& master_info)
 
 void report_tablet_callback(StorageEngine& engine, const TMasterInfo& 
master_info);
 
+void calc_delete_bimtap_callback(CloudStorageEngine& engine, const 
TAgentTaskRequest& req);
+
 } // namespace doris
diff --git a/be/src/cloud/cloud_delete_task.cpp 
b/be/src/cloud/cloud_delete_task.cpp
index 2e3913533c3..7895799f9be 100644
--- a/be/src/cloud/cloud_delete_task.cpp
+++ b/be/src/cloud/cloud_delete_task.cpp
@@ -83,7 +83,15 @@ Status CloudDeleteTask::execute(CloudStorageEngine& engine, 
const TPushReq& requ
     tablet->fetch_add_approximate_num_rowsets(1);
     tablet->fetch_add_approximate_cumu_num_rowsets(1);
 
-    // TODO(liaoxin): set_tablet_txn_info if enable_unique_key_merge_on_write
+    // TODO(liaoxin) delete operator don't send calculate delete bitmap task 
from fe,
+    //  then we don't need to set_txn_related_delete_bitmap here.
+    if (tablet->enable_unique_key_merge_on_write()) {
+        DeleteBitmapPtr delete_bitmap = 
std::make_shared<DeleteBitmap>(tablet->tablet_id());
+        RowsetIdUnorderedSet rowset_ids;
+        engine.txn_delete_bitmap_cache().set_tablet_txn_info(
+                request.transaction_id, tablet->tablet_id(), delete_bitmap, 
rowset_ids, rowset,
+                request.timeout, nullptr);
+    }
 
     return Status::OK();
 }
diff --git a/be/src/cloud/cloud_delta_writer.cpp 
b/be/src/cloud/cloud_delta_writer.cpp
index 9d549997cd7..f0b148448f7 100644
--- a/be/src/cloud/cloud_delta_writer.cpp
+++ b/be/src/cloud/cloud_delta_writer.cpp
@@ -114,4 +114,8 @@ Status CloudDeltaWriter::commit_rowset() {
     return _engine.meta_mgr().commit_rowset(*rowset_meta(), true);
 }
 
+Status CloudDeltaWriter::set_txn_related_delete_bitmap() {
+    return rowset_builder()->set_txn_related_delete_bitmap();
+}
+
 } // namespace doris
diff --git a/be/src/cloud/cloud_delta_writer.h 
b/be/src/cloud/cloud_delta_writer.h
index 61a025130f0..c4153a075db 100644
--- a/be/src/cloud/cloud_delta_writer.h
+++ b/be/src/cloud/cloud_delta_writer.h
@@ -51,6 +51,8 @@ public:
 
     Status commit_rowset();
 
+    Status set_txn_related_delete_bitmap();
+
 private:
     // Convert `_rowset_builder` from `BaseRowsetBuilder` to 
`CloudRowsetBuilder`
     CloudRowsetBuilder* rowset_builder();
diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp 
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
new file mode 100644
index 00000000000..2ff9b8e91ba
--- /dev/null
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
@@ -0,0 +1,168 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_engine_calc_delete_bitmap_task.h"
+
+#include <memory>
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_tablet.h"
+#include "common/status.h"
+#include "olap/base_tablet.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/rowset.h"
+#include "olap/tablet_fwd.h"
+#include "olap/tablet_meta.h"
+#include "olap/txn_manager.h"
+#include "olap/utils.h"
+
+namespace doris {
+
+CloudEngineCalcDeleteBitmapTask::CloudEngineCalcDeleteBitmapTask(
+        CloudStorageEngine& engine, const TCalcDeleteBitmapRequest& 
cal_delete_bitmap_req,
+        std::vector<TTabletId>* error_tablet_ids, std::vector<TTabletId>* 
succ_tablet_ids)
+        : _engine(engine),
+          _cal_delete_bitmap_req(cal_delete_bitmap_req),
+          _error_tablet_ids(error_tablet_ids),
+          _succ_tablet_ids(succ_tablet_ids) {}
+
+void CloudEngineCalcDeleteBitmapTask::add_error_tablet_id(int64_t tablet_id, 
const Status& err) {
+    std::lock_guard<std::mutex> lck(_mutex);
+    _error_tablet_ids->push_back(tablet_id);
+    if (_res.ok() || _res.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) {
+        _res = err;
+    }
+}
+
+void CloudEngineCalcDeleteBitmapTask::add_succ_tablet_id(int64_t tablet_id) {
+    std::lock_guard<std::mutex> lck(_mutex);
+    _succ_tablet_ids->push_back(tablet_id);
+}
+
+Status CloudEngineCalcDeleteBitmapTask::execute() {
+    int64_t transaction_id = _cal_delete_bitmap_req.transaction_id;
+    OlapStopWatch watch;
+    VLOG_NOTICE << "begin to calculate delete bitmap. transaction_id=" << 
transaction_id;
+    std::unique_ptr<ThreadPoolToken> token =
+            _engine.calc_tablet_delete_bitmap_task_thread_pool()->new_token(
+                    ThreadPool::ExecutionMode::CONCURRENT);
+
+    for (const auto& partition : _cal_delete_bitmap_req.partitions) {
+        int64_t version = partition.version;
+        for (auto tablet_id : partition.tablet_ids) {
+            auto base_tablet = DORIS_TRY(_engine.get_tablet(tablet_id));
+            std::shared_ptr<CloudTablet> tablet =
+                    std::dynamic_pointer_cast<CloudTablet>(base_tablet);
+            if (tablet == nullptr) {
+                LOG(WARNING) << "can't get tablet when calculate delete 
bitmap. tablet_id="
+                             << tablet_id;
+                _error_tablet_ids->push_back(tablet_id);
+                _res = Status::Error<ErrorCode::PUSH_TABLE_NOT_EXIST>(
+                        "can't get tablet when calculate delete bitmap. 
tablet_id={}", tablet_id);
+                break;
+            }
+
+            Status st = tablet->sync_rowsets();
+            if (!st.ok() && !st.is<ErrorCode::INVALID_TABLET_STATE>()) {
+                return st;
+            }
+            if (st.is<ErrorCode::INVALID_TABLET_STATE>()) [[unlikely]] {
+                add_succ_tablet_id(tablet->tablet_id());
+                LOG(INFO)
+                        << "tablet is under alter process, delete bitmap will 
be calculated later, "
+                           "tablet_id: "
+                        << tablet->tablet_id() << " txn_id: " << transaction_id
+                        << ", request_version=" << version;
+                continue;
+            }
+            int64_t max_version = tablet->max_version_unlocked();
+            if (version != max_version + 1) {
+                _error_tablet_ids->push_back(tablet_id);
+                _res = Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, 
false>(
+                        "version not continuous");
+                LOG(WARNING) << "version not continuous, current max version=" 
<< max_version
+                             << ", request_version=" << version
+                             << " tablet_id=" << tablet->tablet_id();
+                break;
+            }
+
+            auto tablet_calc_delete_bitmap_ptr = 
std::make_shared<CloudTabletCalcDeleteBitmapTask>(
+                    _engine, this, tablet, transaction_id, version);
+            auto submit_st = token->submit_func([=]() { 
tablet_calc_delete_bitmap_ptr->handle(); });
+            if (!submit_st.ok()) {
+                _res = submit_st;
+                break;
+            }
+        }
+    }
+    // wait for all finished
+    token->wait();
+
+    LOG(INFO) << "finish to calculate delete bitmap on transaction."
+              << "transaction_id=" << transaction_id << ", cost(us): " << 
watch.get_elapse_time_us()
+              << ", error_tablet_size=" << _error_tablet_ids->size()
+              << ", res=" << _res.to_string();
+    return _res;
+}
+
+CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask(
+        CloudStorageEngine& engine, CloudEngineCalcDeleteBitmapTask* 
engine_task,
+        std::shared_ptr<CloudTablet> tablet, int64_t transaction_id, int64_t 
version)
+        : _engine(engine),
+          _engine_calc_delete_bitmap_task(engine_task),
+          _tablet(tablet),
+          _transaction_id(transaction_id),
+          _version(version) {}
+
+void CloudTabletCalcDeleteBitmapTask::handle() const {
+    RowsetSharedPtr rowset;
+    DeleteBitmapPtr delete_bitmap;
+    RowsetIdUnorderedSet rowset_ids;
+    std::shared_ptr<PartialUpdateInfo> partial_update_info;
+    int64_t txn_expiration;
+    Status status = _engine.txn_delete_bitmap_cache().get_tablet_txn_info(
+            _transaction_id, _tablet->tablet_id(), &rowset, &delete_bitmap, 
&rowset_ids,
+            &txn_expiration, &partial_update_info);
+    if (status != Status::OK()) {
+        LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << 
_tablet->tablet_id()
+                     << ", txn_id=" << _transaction_id << ", status=" << 
status;
+        
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet->tablet_id(), 
status);
+        return;
+    }
+
+    rowset->set_version(Version(_version, _version));
+    TabletTxnInfo txn_info;
+    txn_info.rowset = rowset;
+    txn_info.delete_bitmap = delete_bitmap;
+    txn_info.rowset_ids = rowset_ids;
+    txn_info.partial_update_info = partial_update_info;
+    status = CloudTablet::update_delete_bitmap(_tablet, &txn_info, 
_transaction_id, txn_expiration);
+    if (status != Status::OK()) {
+        LOG(WARNING) << "failed to calculate delete bitmap. rowset_id=" << 
rowset->rowset_id()
+                     << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" 
<< _transaction_id
+                     << ", status=" << status;
+        
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet->tablet_id(), 
status);
+        return;
+    }
+
+    _engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet->tablet_id());
+    LOG(INFO) << "calculate delete bitmap successfully on tablet"
+              << ", table_id=" << _tablet->table_id() << ", transaction_id=" 
<< _transaction_id
+              << ", num_rows=" << rowset->num_rows() << ", res=" << status;
+}
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h 
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
new file mode 100644
index 00000000000..514f1b059d5
--- /dev/null
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet.h"
+#include "gen_cpp/AgentService_types.h"
+#include "olap/tablet_fwd.h"
+#include "olap/task/engine_task.h"
+
+namespace doris {
+
+class CloudEngineCalcDeleteBitmapTask;
+
+class CloudTabletCalcDeleteBitmapTask {
+public:
+    CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine,
+                                    CloudEngineCalcDeleteBitmapTask* 
engine_task,
+                                    std::shared_ptr<CloudTablet> tablet, 
int64_t transaction_id,
+                                    int64_t version);
+    ~CloudTabletCalcDeleteBitmapTask() = default;
+
+    void handle() const;
+
+private:
+    CloudStorageEngine& _engine;
+    CloudEngineCalcDeleteBitmapTask* _engine_calc_delete_bitmap_task;
+
+    std::shared_ptr<CloudTablet> _tablet;
+    int64_t _transaction_id;
+    int64_t _version;
+};
+
+class CloudEngineCalcDeleteBitmapTask : public EngineTask {
+public:
+    CloudEngineCalcDeleteBitmapTask(CloudStorageEngine& engine,
+                                    const TCalcDeleteBitmapRequest& 
cal_delete_bitmap_req,
+                                    std::vector<TTabletId>* error_tablet_ids,
+                                    std::vector<TTabletId>* succ_tablet_ids = 
nullptr);
+    Status execute() override;
+
+    void add_error_tablet_id(int64_t tablet_id, const Status& err);
+    void add_succ_tablet_id(int64_t tablet_id);
+
+private:
+    CloudStorageEngine& _engine;
+    const TCalcDeleteBitmapRequest& _cal_delete_bitmap_req;
+    std::mutex _mutex;
+    std::vector<TTabletId>* _error_tablet_ids;
+    std::vector<TTabletId>* _succ_tablet_ids;
+
+    Status _res;
+};
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_rowset_builder.cpp 
b/be/src/cloud/cloud_rowset_builder.cpp
index 2ac7f2e3337..13f49ae35a5 100644
--- a/be/src/cloud/cloud_rowset_builder.cpp
+++ b/be/src/cloud/cloud_rowset_builder.cpp
@@ -34,8 +34,11 @@ CloudRowsetBuilder::~CloudRowsetBuilder() = default;
 Status CloudRowsetBuilder::init() {
     _tablet = DORIS_TRY(_engine.get_tablet(_req.tablet_id));
 
-    // TODO(plat1ko): get rowset ids snapshot to calculate delete bitmap
-
+    std::shared_ptr<MowContext> mow_context;
+    if (_tablet->enable_unique_key_merge_on_write()) {
+        
RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(_tablet)->sync_rowsets());
+        RETURN_IF_ERROR(init_mow_context(mow_context));
+    }
     RETURN_IF_ERROR(check_tablet_version_count());
 
     // build tablet schema in request level
@@ -55,8 +58,7 @@ Status CloudRowsetBuilder::init() {
     context.index_id = _req.index_id;
     context.tablet = _tablet;
     context.write_type = DataWriteType::TYPE_DIRECT;
-    // TODO(plat1ko):
-    // context.mow_context = mow_context;
+    context.mow_context = mow_context;
     context.write_file_cache = _req.write_file_cache;
     context.partial_update_info = _partial_update_info;
     // New loaded data is always written to latest shared storage
@@ -104,4 +106,24 @@ const RowsetMetaSharedPtr& 
CloudRowsetBuilder::rowset_meta() {
     return _rowset_writer->rowset_meta();
 }
 
+Status CloudRowsetBuilder::set_txn_related_delete_bitmap() {
+    if (_tablet->enable_unique_key_merge_on_write()) {
+        if (config::enable_merge_on_write_correctness_check && 
_rowset->num_rows() != 0) {
+            auto st = _tablet->check_delete_bitmap_correctness(
+                    _delete_bitmap, _rowset->end_version() - 1, _req.txn_id, 
_rowset_ids);
+            if (!st.ok()) {
+                LOG(WARNING) << fmt::format(
+                        
"[tablet_id:{}][txn_id:{}][load_id:{}][partition_id:{}] "
+                        "delete bitmap correctness check failed in commit 
phase!",
+                        _req.tablet_id, _req.txn_id, 
UniqueId(_req.load_id).to_string(),
+                        _req.partition_id);
+                return st;
+            }
+        }
+        _engine.txn_delete_bitmap_cache().set_tablet_txn_info(
+                _req.txn_id, _tablet->tablet_id(), _delete_bitmap, 
_rowset_ids, _rowset,
+                _req.txn_expiration, _partial_update_info);
+    }
+    return Status::OK();
+}
 } // namespace doris
diff --git a/be/src/cloud/cloud_rowset_builder.h 
b/be/src/cloud/cloud_rowset_builder.h
index 0ce2cfa514d..05e24e66382 100644
--- a/be/src/cloud/cloud_rowset_builder.h
+++ b/be/src/cloud/cloud_rowset_builder.h
@@ -37,6 +37,8 @@ public:
 
     const RowsetMetaSharedPtr& rowset_meta();
 
+    Status set_txn_related_delete_bitmap();
+
 private:
     // Convert `_tablet` from `BaseTablet` to `CloudTablet`
     CloudTablet* cloud_tablet();
diff --git a/be/src/cloud/cloud_rowset_writer.cpp 
b/be/src/cloud/cloud_rowset_writer.cpp
index f7e4f006194..98f7752b660 100644
--- a/be/src/cloud/cloud_rowset_writer.cpp
+++ b/be/src/cloud/cloud_rowset_writer.cpp
@@ -115,9 +115,4 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
     return Status::OK();
 }
 
-Status CloudRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
-    // TODO(plat1ko)
-    return Status::NotSupported("CloudRowsetWriter::_generate_delete_bitmap is 
not implemented");
-}
-
 } // namespace doris
diff --git a/be/src/cloud/cloud_rowset_writer.h 
b/be/src/cloud/cloud_rowset_writer.h
index c8f0ed72d15..1bb2b3d38b8 100644
--- a/be/src/cloud/cloud_rowset_writer.h
+++ b/be/src/cloud/cloud_rowset_writer.h
@@ -30,9 +30,6 @@ public:
     Status init(const RowsetWriterContext& rowset_writer_context) override;
 
     Status build(RowsetSharedPtr& rowset) override;
-
-private:
-    Status _generate_delete_bitmap(int32_t segment_id) override;
 };
 
 } // namespace doris
diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index 6eea0972d01..d82e6503c66 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -28,6 +28,7 @@
 #include "cloud/cloud_full_compaction.h"
 #include "cloud/cloud_meta_mgr.h"
 #include "cloud/cloud_tablet_mgr.h"
+#include "cloud/cloud_txn_delete_bitmap_cache.h"
 #include "cloud/config.h"
 #include "io/fs/s3_file_system.h"
 #include "olap/cumulative_compaction_policy.h"
@@ -99,6 +100,10 @@ Status CloudStorageEngine::open() {
     _calc_delete_bitmap_executor = 
std::make_unique<CalcDeleteBitmapExecutor>();
     _calc_delete_bitmap_executor->init();
 
+    _txn_delete_bitmap_cache =
+            
std::make_unique<CloudTxnDeleteBitmapCache>(config::delete_bitmap_agg_cache_capacity);
+    RETURN_IF_ERROR(_txn_delete_bitmap_cache->init());
+
     return Status::OK();
 }
 
@@ -150,6 +155,12 @@ Status CloudStorageEngine::start_bg_threads() {
             &_evict_quering_rowset_thread));
     LOG(INFO) << "evict quering thread started";
 
+    // add calculate tablet delete bitmap task thread pool
+    RETURN_IF_ERROR(ThreadPoolBuilder("TabletCalDeleteBitmapThreadPool")
+                            .set_min_threads(1)
+                            
.set_max_threads(config::calc_tablet_delete_bitmap_task_max_thread)
+                            
.build(&_calc_tablet_delete_bitmap_task_thread_pool));
+
     // TODO(plat1ko): check_bucket_enable_versioning_thread
 
     // compaction tasks producer thread
diff --git a/be/src/cloud/cloud_storage_engine.h 
b/be/src/cloud/cloud_storage_engine.h
index c2841ac9ded..61e4ca9859b 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -24,7 +24,9 @@
 //#include "cloud/cloud_full_compaction.h"
 #include "cloud/cloud_cumulative_compaction_policy.h"
 #include "cloud/cloud_tablet.h"
+#include "cloud_txn_delete_bitmap_cache.h"
 #include "olap/storage_engine.h"
+#include "util/threadpool.h"
 
 namespace doris {
 namespace cloud {
@@ -59,6 +61,11 @@ public:
 
     CloudTabletMgr& tablet_mgr() { return *_tablet_mgr; }
 
+    CloudTxnDeleteBitmapCache& txn_delete_bitmap_cache() { return 
*_txn_delete_bitmap_cache; }
+    std::unique_ptr<ThreadPool>& calc_tablet_delete_bitmap_task_thread_pool() {
+        return _calc_tablet_delete_bitmap_task_thread_pool;
+    }
+
     io::FileSystemSPtr latest_fs() const {
         std::lock_guard lock(_latest_fs_mtx);
         return _latest_fs;
@@ -97,6 +104,8 @@ private:
 
     std::unique_ptr<cloud::CloudMetaMgr> _meta_mgr;
     std::unique_ptr<CloudTabletMgr> _tablet_mgr;
+    std::unique_ptr<CloudTxnDeleteBitmapCache> _txn_delete_bitmap_cache;
+    std::unique_ptr<ThreadPool> _calc_tablet_delete_bitmap_task_thread_pool;
 
     // FileSystem with latest shared storage info, new data will be written to 
this fs.
     mutable std::mutex _latest_fs_mtx;
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 7bd05c8720b..8e3932f7d24 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -24,20 +24,25 @@
 #include <rapidjson/stringbuffer.h>
 
 #include <atomic>
+#include <memory>
 
 #include "cloud/cloud_meta_mgr.h"
 #include "cloud/cloud_storage_engine.h"
 #include "io/cache/block/block_file_cache_factory.h"
 #include "olap/olap_define.h"
+#include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_factory.h"
 #include "olap/rowset/rowset_fwd.h"
 #include "olap/rowset/rowset_writer.h"
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
+#include "olap/txn_manager.h"
 
 namespace doris {
 using namespace ErrorCode;
 
+static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1;
+
 CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr 
tablet_meta)
         : BaseTablet(std::move(tablet_meta)), _engine(engine) {
     _tablet_path = remote_tablet_path(_tablet_meta->tablet_id());
@@ -351,6 +356,36 @@ Result<std::unique_ptr<RowsetWriter>> 
CloudTablet::create_rowset_writer(
     return RowsetFactory::create_rowset_writer(_engine, context, vertical);
 }
 
+// create a rowset writer with rowset_id and seg_id
+// after writer, merge this transient rowset with original rowset
+Result<std::unique_ptr<RowsetWriter>> 
CloudTablet::create_transient_rowset_writer(
+        const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> 
partial_update_info,
+        int64_t txn_expiration) {
+    RowsetWriterContext context;
+    context.rowset_state = PREPARED;
+    context.segments_overlap = OVERLAPPING;
+    context.tablet_schema = std::make_shared<TabletSchema>();
+    context.tablet_schema->copy_from(*(rowset.tablet_schema()));
+    context.newest_write_timestamp = UnixSeconds();
+    context.tablet_id = table_id();
+    context.enable_segcompaction = false;
+    context.write_type = DataWriteType::TYPE_DIRECT;
+    context.partial_update_info = std::move(partial_update_info);
+    context.is_transient_rowset_writer = true;
+    context.rowset_id = rowset.rowset_id();
+    context.tablet_id = tablet_id();
+    context.index_id = index_id();
+    context.partition_id = partition_id();
+    context.rowset_dir = remote_tablet_path(tablet_id());
+    context.enable_unique_key_merge_on_write = 
enable_unique_key_merge_on_write();
+    context.txn_expiration = txn_expiration;
+    return RowsetFactory::create_rowset_writer(_engine, context, false)
+            .transform([&](auto&& writer) {
+                writer->set_segment_start_id(rowset.num_segments());
+                return writer;
+            });
+}
+
 int64_t CloudTablet::get_cloud_base_compaction_score() const {
     return _approximate_num_rowsets.load(std::memory_order_relaxed) -
            _approximate_cumu_num_rowsets.load(std::memory_order_relaxed);
@@ -476,4 +511,36 @@ std::vector<RowsetSharedPtr> 
CloudTablet::pick_candidate_rowsets_to_full_compact
     return pick_candidate_rowsets_to_single_replica_compaction();
 }
 
+CalcDeleteBitmapExecutor* CloudTablet::calc_delete_bitmap_executor() {
+    return _engine.calc_delete_bitmap_executor();
+}
+
+Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t 
txn_id,
+                                       DeleteBitmapPtr delete_bitmap, 
RowsetWriter* rowset_writer,
+                                       const RowsetIdUnorderedSet& 
cur_rowset_ids) {
+    RowsetSharedPtr rowset = txn_info->rowset;
+    int64_t cur_version = rowset->start_version();
+    // update delete bitmap info, in order to avoid recalculation when trying 
again
+    _engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id, 
tablet_id(), delete_bitmap,
+                                                             cur_rowset_ids);
+
+    if (txn_info->partial_update_info && 
txn_info->partial_update_info->is_partial_update &&
+        rowset_writer->num_rows() > 0) {
+        const auto& rowset_meta = rowset->rowset_meta();
+        RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
+    }
+
+    DeleteBitmapPtr new_delete_bitmap = 
std::make_shared<DeleteBitmap>(tablet_id());
+    for (auto iter = delete_bitmap->delete_bitmap.begin();
+         iter != delete_bitmap->delete_bitmap.end(); ++iter) {
+        new_delete_bitmap->merge({std::get<0>(iter->first), 
std::get<1>(iter->first), cur_version},
+                                 iter->second);
+    }
+
+    RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
+            *this, txn_id, COMPACTION_DELETE_BITMAP_LOCK_ID, 
new_delete_bitmap.get()));
+
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index a9fce6dda5f..37ba38c6db3 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -17,7 +17,10 @@
 
 #pragma once
 
+#include <memory>
+
 #include "olap/base_tablet.h"
+#include "olap/partial_update_info.h"
 
 namespace doris {
 
@@ -172,6 +175,16 @@ public:
     std::mutex& get_base_compaction_lock() { return _base_compaction_lock; }
     std::mutex& get_cumulative_compaction_lock() { return 
_cumulative_compaction_lock; }
 
+    Result<std::unique_ptr<RowsetWriter>> create_transient_rowset_writer(
+            const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> 
partial_update_info,
+            int64_t txn_expiration = 0) override;
+
+    CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() override;
+
+    Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
+                              DeleteBitmapPtr delete_bitmap, RowsetWriter* 
rowset_writer,
+                              const RowsetIdUnorderedSet& cur_rowset_ids) 
override;
+
     int64_t last_sync_time_s = 0;
     int64_t last_load_time_ms = 0;
     int64_t last_base_compaction_success_time_ms = 0;
diff --git a/be/src/cloud/cloud_tablets_channel.cpp 
b/be/src/cloud/cloud_tablets_channel.cpp
index 07c247ac809..046916aa9a0 100644
--- a/be/src/cloud/cloud_tablets_channel.cpp
+++ b/be/src/cloud/cloud_tablets_channel.cpp
@@ -235,7 +235,16 @@ Status CloudTabletsChannel::close(LoadChannel* parent, 
const PTabletWriterAddBlo
         }
     }
 
-    // TODO(plat1ko): 6. set txn related delete bitmap if necessary
+    // 6. set txn related delete bitmap if necessary
+    for (auto it = writers_to_commit.begin(); it != writers_to_commit.end();) {
+        auto st = (*it)->set_txn_related_delete_bitmap();
+        if (!st.ok()) {
+            _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
+            _close_status = std::move(st);
+            return _close_status;
+        }
+        it++;
+    }
 
     tablet_vec->Reserve(writers_to_commit.size());
     for (auto* writer : writers_to_commit) {
diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp 
b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
new file mode 100644
index 00000000000..08bc035770a
--- /dev/null
+++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_txn_delete_bitmap_cache.h"
+
+#include <fmt/core.h>
+
+#include <chrono>
+#include <memory>
+#include <shared_mutex>
+
+#include "common/status.h"
+#include "common/sync_point.h"
+#include "olap/olap_common.h"
+#include "olap/tablet_meta.h"
+
+namespace doris {
+
+CloudTxnDeleteBitmapCache::CloudTxnDeleteBitmapCache(size_t size_in_bytes)
+        : 
LRUCachePolicy(CachePolicy::CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE, 
size_in_bytes,
+                         LRUCacheType::SIZE, 86400, 4),
+          _stop_latch(1) {}
+
+CloudTxnDeleteBitmapCache::~CloudTxnDeleteBitmapCache() {
+    _stop_latch.count_down();
+    _clean_thread->join();
+}
+
+Status CloudTxnDeleteBitmapCache::init() {
+    auto st = Thread::create(
+            "CloudTxnDeleteBitmapCache", "clean_txn_dbm_thread",
+            [this]() { this->_clean_thread_callback(); }, &_clean_thread);
+    if (!st.ok()) {
+        LOG(WARNING) << "failed to create thread for 
CloudTxnDeleteBitmapCache, error: " << st;
+    }
+    return st;
+}
+
+Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
+        TTransactionId transaction_id, int64_t tablet_id, RowsetSharedPtr* 
rowset,
+        DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids, 
int64_t* txn_expiration,
+        std::shared_ptr<PartialUpdateInfo>* partial_update_info) {
+    {
+        std::shared_lock<std::shared_mutex> rlock(_rwlock);
+        TxnKey key(transaction_id, tablet_id);
+        auto iter = _txn_map.find(key);
+        if (iter == _txn_map.end()) {
+            return Status::Error<ErrorCode::NOT_FOUND, false>(
+                    "not found txn info, tablet_id={}, transaction_id={}", 
tablet_id,
+                    transaction_id);
+        }
+        *rowset = iter->second.rowset;
+        *txn_expiration = iter->second.txn_expiration;
+        *partial_update_info = iter->second.partial_update_info;
+    }
+    std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
+    CacheKey key(key_str);
+    Cache::Handle* handle = cache()->lookup(key);
+
+    DeleteBitmapCacheValue* val =
+            handle == nullptr ? nullptr
+                              : 
reinterpret_cast<DeleteBitmapCacheValue*>(cache()->value(handle));
+    if (val) {
+        *delete_bitmap = val->delete_bitmap;
+        *rowset_ids = val->rowset_ids;
+        // must call release handle to reduce the reference count,
+        // otherwise there will be memory leak
+        cache()->release(handle);
+    } else {
+        LOG_INFO("cache missed when get delete bitmap")
+                .tag("txn_id", transaction_id)
+                .tag("tablet_id", tablet_id);
+        // Becasue of the rowset_ids become empty, all delete bitmap
+        // will be recalculate in CalcDeleteBitmapTask
+        *delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id);
+    }
+    return Status::OK();
+}
+
+void CloudTxnDeleteBitmapCache::set_tablet_txn_info(
+        TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr 
delete_bitmap,
+        const RowsetIdUnorderedSet& rowset_ids, RowsetSharedPtr rowset, 
int64_t txn_expiration,
+        std::shared_ptr<PartialUpdateInfo> partial_update_info) {
+    if (txn_expiration <= 0) {
+        txn_expiration = duration_cast<std::chrono::seconds>(
+                                 
std::chrono::system_clock::now().time_since_epoch())
+                                 .count() +
+                         120;
+    }
+    {
+        std::unique_lock<std::shared_mutex> wlock(_rwlock);
+        TxnKey txn_key(transaction_id, tablet_id);
+        _txn_map[txn_key] = TxnVal(rowset, txn_expiration, 
std::move(partial_update_info));
+        _expiration_txn.emplace(txn_expiration, txn_key);
+    }
+    std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
+    CacheKey key(key_str);
+
+    auto val = new DeleteBitmapCacheValue(delete_bitmap, rowset_ids);
+    auto deleter = [](const CacheKey&, void* value) {
+        delete (DeleteBitmapCacheValue*)value; // Just delete to reclaim
+    };
+    size_t charge = sizeof(DeleteBitmapCacheValue);
+    for (auto& [k, v] : val->delete_bitmap->delete_bitmap) {
+        charge += v.getSizeInBytes();
+    }
+    auto handle = cache()->insert(key, val, charge, deleter, 
CachePriority::NORMAL);
+    // must call release handle to reduce the reference count,
+    // otherwise there will be memory leak
+    cache()->release(handle);
+    LOG_INFO("set txn related delete bitmap")
+            .tag("txn_id", transaction_id)
+            .tag("expiration", txn_expiration)
+            .tag("tablet_id", tablet_id)
+            .tag("delete_bitmap_size", charge);
+}
+
+void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId 
transaction_id,
+                                                       int64_t tablet_id,
+                                                       DeleteBitmapPtr 
delete_bitmap,
+                                                       const 
RowsetIdUnorderedSet& rowset_ids) {
+    std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
+    CacheKey key(key_str);
+
+    auto val = new DeleteBitmapCacheValue(delete_bitmap, rowset_ids);
+    auto deleter = [](const CacheKey&, void* value) {
+        delete (DeleteBitmapCacheValue*)value; // Just delete to reclaim
+    };
+    size_t charge = sizeof(DeleteBitmapCacheValue);
+    for (auto& [k, v] : val->delete_bitmap->delete_bitmap) {
+        charge += v.getSizeInBytes();
+    }
+    auto handle = cache()->insert(key, val, charge, deleter, 
CachePriority::NORMAL);
+    // must call release handle to reduce the reference count,
+    // otherwise there will be memory leak
+    cache()->release(handle);
+    LOG_INFO("update txn related delete bitmap")
+            .tag("txn_id", transaction_id)
+            .tag("tablt_id", tablet_id)
+            .tag("delete_bitmap_size", charge);
+}
+
+void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
+    
TEST_SYNC_POINT_RETURN_WITH_VOID("CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info");
+    std::unique_lock<std::shared_mutex> wlock(_rwlock);
+    while (!_expiration_txn.empty()) {
+        auto iter = _expiration_txn.begin();
+        int64_t current_time = duration_cast<std::chrono::seconds>(
+                                       
std::chrono::system_clock::now().time_since_epoch())
+                                       .count();
+        if (iter->first > current_time) {
+            break;
+        }
+        auto txn_iter = _txn_map.find(iter->second);
+        if (iter->first == txn_iter->second.txn_expiration) {
+            LOG_INFO("clean expired delete bitmap")
+                    .tag("txn_id", txn_iter->first.txn_id)
+                    .tag("expiration", txn_iter->second.txn_expiration)
+                    .tag("tablt_id", txn_iter->first.tablet_id);
+            std::string key_str = std::to_string(txn_iter->first.txn_id) + "/" 
+
+                                  std::to_string(txn_iter->first.tablet_id); 
// Cache key container
+            CacheKey cache_key(key_str);
+            cache()->erase(cache_key);
+            _txn_map.erase(iter->second);
+        }
+        _expiration_txn.erase(iter);
+    }
+}
+
+void CloudTxnDeleteBitmapCache::_clean_thread_callback() {
+    do {
+        remove_expired_tablet_txn_info();
+    } while (!_stop_latch.wait_for(std::chrono::seconds(300)));
+}
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h 
b/be/src/cloud/cloud_txn_delete_bitmap_cache.h
new file mode 100644
index 00000000000..c84e19a765f
--- /dev/null
+++ b/be/src/cloud/cloud_txn_delete_bitmap_cache.h
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <mutex>
+
+#include "olap/lru_cache.h"
+#include "olap/olap_common.h"
+#include "olap/partial_update_info.h"
+#include "olap/rowset/rowset.h"
+#include "olap/tablet_meta.h"
+#include "util/countdown_latch.h"
+
+namespace doris {
+
+// Record transaction related delete bitmaps using a lru cache.
+class CloudTxnDeleteBitmapCache : public LRUCachePolicy {
+public:
+    CloudTxnDeleteBitmapCache(size_t size_in_bytes);
+
+    ~CloudTxnDeleteBitmapCache() override;
+
+    Status init();
+
+    Status get_tablet_txn_info(TTransactionId transaction_id, int64_t 
tablet_id,
+                               RowsetSharedPtr* rowset, DeleteBitmapPtr* 
delete_bitmap,
+                               RowsetIdUnorderedSet* rowset_ids, int64_t* 
txn_expiration,
+                               std::shared_ptr<PartialUpdateInfo>* 
partial_update_info);
+
+    void set_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id,
+                             DeleteBitmapPtr delete_bitmap, const 
RowsetIdUnorderedSet& rowset_ids,
+                             RowsetSharedPtr rowset, int64_t txn_expirationm,
+                             std::shared_ptr<PartialUpdateInfo> 
partial_update_info);
+
+    void update_tablet_txn_info(TTransactionId transaction_id, int64_t 
tablet_id,
+                                DeleteBitmapPtr delete_bitmap,
+                                const RowsetIdUnorderedSet& rowset_ids);
+
+    void remove_expired_tablet_txn_info();
+
+private:
+    void _clean_thread_callback();
+
+    struct DeleteBitmapCacheValue {
+        DeleteBitmapPtr delete_bitmap;
+        // records rowsets calc in commit txn
+        RowsetIdUnorderedSet rowset_ids;
+
+        DeleteBitmapCacheValue(DeleteBitmapPtr delete_bitmap_, const 
RowsetIdUnorderedSet& ids_)
+                : delete_bitmap(std::move(delete_bitmap_)), rowset_ids(ids_) {}
+    };
+
+    struct TxnKey {
+        TTransactionId txn_id;
+        int64_t tablet_id;
+        TxnKey(TTransactionId txn_id_, int64_t tablet_id_)
+                : txn_id(txn_id_), tablet_id(tablet_id_) {}
+        auto operator<=>(const TxnKey&) const = default;
+    };
+
+    struct TxnVal {
+        RowsetSharedPtr rowset;
+        int64_t txn_expiration;
+        std::shared_ptr<PartialUpdateInfo> partial_update_info;
+        TxnVal() : txn_expiration(0) {};
+        TxnVal(RowsetSharedPtr rowset_, int64_t txn_expiration_,
+               std::shared_ptr<PartialUpdateInfo> partial_update_info_)
+                : rowset(std::move(rowset_)),
+                  txn_expiration(txn_expiration_),
+                  partial_update_info(std::move(partial_update_info_)) {}
+    };
+
+    std::map<TxnKey, TxnVal> _txn_map;
+    std::multimap<int64_t, TxnKey> _expiration_txn;
+    std::shared_mutex _rwlock;
+    scoped_refptr<Thread> _clean_thread;
+    CountDownLatch _stop_latch;
+};
+
+} // namespace doris
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index b30191d9e17..a08fdd5a6cc 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -156,6 +156,10 @@ DEFINE_Int32(tablet_publish_txn_max_thread, "32");
 DEFINE_Int32(publish_version_task_timeout_s, "8");
 // the count of thread to calc delete bitmap
 DEFINE_Int32(calc_delete_bitmap_max_thread, "32");
+// the count of thread to calc delete bitmap worker, only used for cloud
+DEFINE_Int32(calc_delete_bitmap_worker_count, "8");
+// the count of thread to calc tablet delete bitmap task, only used for cloud
+DEFINE_Int32(calc_tablet_delete_bitmap_task_max_thread, "32");
 // the count of thread to clear transaction task
 DEFINE_Int32(clear_transaction_task_worker_count, "1");
 // the count of thread to delete
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0160919e66d..81685bc1e5c 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -204,6 +204,10 @@ DECLARE_Int32(tablet_publish_txn_max_thread);
 DECLARE_Int32(publish_version_task_timeout_s);
 // the count of thread to calc delete bitmap
 DECLARE_Int32(calc_delete_bitmap_max_thread);
+// the count of thread to calc delete bitmap worker, only used for cloud
+DECLARE_Int32(calc_delete_bitmap_worker_count);
+// the count of thread to calc tablet delete bitmap task, only used for cloud
+DECLARE_Int32(calc_tablet_delete_bitmap_task_max_thread);
 // the count of thread to clear transaction task
 DECLARE_Int32(clear_transaction_task_worker_count);
 // the count of thread to delete
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 09960fd0c03..c4ce206d8c5 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -18,7 +18,9 @@
 #include "olap/base_tablet.h"
 
 #include <fmt/format.h>
+#include <rapidjson/prettywriter.h>
 
+#include "common/status.h"
 #include "olap/calc_delete_bitmap_executor.h"
 #include "olap/delete_bitmap_calculator.h"
 #include "olap/memtable.h"
@@ -27,8 +29,10 @@
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_reader.h"
 #include "olap/tablet_fwd.h"
+#include "olap/txn_manager.h"
 #include "service/point_query_executor.h"
 #include "util/bvar_helper.h"
+#include "util/debug_points.h"
 #include "util/doris_metrics.h"
 #include "vec/common/schema_util.h"
 #include "vec/data_types/data_type_factory.hpp"
@@ -45,6 +49,7 @@ bvar::LatencyRecorder 
g_tablet_lookup_rowkey_latency("doris_pk", "tablet_lookup_
 bvar::Adder<uint64_t> g_tablet_pk_not_found("doris_pk", "lookup_not_found");
 bvar::PerSecond<bvar::Adder<uint64_t>> g_tablet_pk_not_found_per_second(
         "doris_pk", "lookup_not_found_per_second", &g_tablet_pk_not_found, 60);
+bvar::LatencyRecorder g_tablet_update_delete_bitmap_latency("doris_pk", 
"update_delete_bitmap");
 
 // read columns by read plan
 // read_index: ori_pos-> block_idx
@@ -1059,4 +1064,200 @@ Status BaseTablet::_capture_consistent_rowsets_unlocked(
     return Status::OK();
 }
 
+Status BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr 
delete_bitmap,
+                                                   int64_t max_version, 
int64_t txn_id,
+                                                   const RowsetIdUnorderedSet& 
rowset_ids,
+                                                   
std::vector<RowsetSharedPtr>* rowsets) {
+    RowsetIdUnorderedSet missing_ids;
+    for (const auto& rowsetid : rowset_ids) {
+        if (!delete_bitmap->delete_bitmap.contains({rowsetid, 
DeleteBitmap::INVALID_SEGMENT_ID,
+                                                    
DeleteBitmap::TEMP_VERSION_COMMON})) {
+            missing_ids.insert(rowsetid);
+        }
+    }
+
+    if (!missing_ids.empty()) {
+        LOG(WARNING) << "[txn_id:" << txn_id << "][tablet_id:" << tablet_id()
+                     << "][max_version: " << max_version
+                     << "] check delete bitmap correctness failed!";
+        rapidjson::Document root;
+        root.SetObject();
+        rapidjson::Document required_rowsets_arr;
+        required_rowsets_arr.SetArray();
+        rapidjson::Document missing_rowsets_arr;
+        missing_rowsets_arr.SetArray();
+
+        if (rowsets != nullptr) {
+            for (const auto& rowset : *rowsets) {
+                rapidjson::Value value;
+                std::string version_str = rowset->get_rowset_info_str();
+                value.SetString(version_str.c_str(), version_str.length(),
+                                required_rowsets_arr.GetAllocator());
+                required_rowsets_arr.PushBack(value, 
required_rowsets_arr.GetAllocator());
+            }
+        } else {
+            std::vector<RowsetSharedPtr> rowsets;
+            {
+                std::shared_lock meta_rlock(_meta_lock);
+                rowsets = get_rowset_by_ids(&rowset_ids);
+            }
+            for (const auto& rowset : rowsets) {
+                rapidjson::Value value;
+                std::string version_str = rowset->get_rowset_info_str();
+                value.SetString(version_str.c_str(), version_str.length(),
+                                required_rowsets_arr.GetAllocator());
+                required_rowsets_arr.PushBack(value, 
required_rowsets_arr.GetAllocator());
+            }
+        }
+        for (const auto& missing_rowset_id : missing_ids) {
+            rapidjson::Value miss_value;
+            std::string rowset_id_str = missing_rowset_id.to_string();
+            miss_value.SetString(rowset_id_str.c_str(), rowset_id_str.length(),
+                                 missing_rowsets_arr.GetAllocator());
+            missing_rowsets_arr.PushBack(miss_value, 
missing_rowsets_arr.GetAllocator());
+        }
+
+        root.AddMember("required_rowsets", required_rowsets_arr, 
root.GetAllocator());
+        root.AddMember("missing_rowsets", missing_rowsets_arr, 
root.GetAllocator());
+        rapidjson::StringBuffer strbuf;
+        rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
+        root.Accept(writer);
+        std::string rowset_status_string = std::string(strbuf.GetString());
+        LOG_EVERY_SECOND(WARNING) << rowset_status_string;
+        // let it crash if correctness check failed in Debug mode
+        DCHECK(false) << "delete bitmap correctness check failed in publish 
phase!";
+        return Status::InternalError("check delete bitmap failed!");
+    }
+    return Status::OK();
+}
+
+void BaseTablet::_remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr 
delete_bitmap) {
+    for (auto it = delete_bitmap->delete_bitmap.begin(), end = 
delete_bitmap->delete_bitmap.end();
+         it != end;) {
+        if (std::get<1>(it->first) == DeleteBitmap::INVALID_SEGMENT_ID) {
+            it = delete_bitmap->delete_bitmap.erase(it);
+        } else {
+            ++it;
+        }
+    }
+}
+
+Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self, const 
TabletTxnInfo* txn_info,
+                                        int64_t txn_id, int64_t 
txn_expiration) {
+    SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency);
+    RowsetIdUnorderedSet cur_rowset_ids;
+    RowsetIdUnorderedSet rowset_ids_to_add;
+    RowsetIdUnorderedSet rowset_ids_to_del;
+    RowsetSharedPtr rowset = txn_info->rowset;
+    int64_t cur_version = rowset->start_version();
+
+    auto rowset_writer = DORIS_TRY(self->create_transient_rowset_writer(
+            *rowset, txn_info->partial_update_info, txn_expiration));
+
+    DeleteBitmapPtr delete_bitmap = txn_info->delete_bitmap;
+    // Partial update might generate new segments when there is conflicts 
while publish, and mark
+    // the same key in original segments as delete.
+    // When the new segment flush fails or the rowset build fails, the 
deletion marker for the
+    // duplicate key of the original segment should not remain in 
`txn_info->delete_bitmap`,
+    // so we need to make a copy of `txn_info->delete_bitmap` and make changes 
on it.
+    if (txn_info->partial_update_info && 
txn_info->partial_update_info->is_partial_update) {
+        delete_bitmap = 
std::make_shared<DeleteBitmap>(*(txn_info->delete_bitmap));
+    }
+
+    OlapStopWatch watch;
+    std::vector<segment_v2::SegmentSharedPtr> segments;
+    
RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
+    auto t1 = watch.get_elapse_time_us();
+
+    {
+        std::shared_lock meta_rlock(self->_meta_lock);
+        // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
+        if (self->tablet_state() == TABLET_NOTREADY) {
+            LOG(INFO) << "tablet is under alter process, update delete bitmap 
later, tablet_id="
+                      << self->tablet_id();
+            return Status::OK();
+        }
+        RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1, 
&cur_rowset_ids));
+    }
+    auto t2 = watch.get_elapse_time_us();
+
+    _rowset_ids_difference(cur_rowset_ids, txn_info->rowset_ids, 
&rowset_ids_to_add,
+                           &rowset_ids_to_del);
+    for (const auto& to_del : rowset_ids_to_del) {
+        delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
+    }
+
+    std::vector<RowsetSharedPtr> specified_rowsets;
+    {
+        std::shared_lock meta_rlock(self->_meta_lock);
+        specified_rowsets = self->get_rowset_by_ids(&rowset_ids_to_add);
+    }
+    auto t3 = watch.get_elapse_time_us();
+
+    // When there is only one segment, it will be calculated in the current 
thread.
+    // Otherwise, it will be submitted to the thread pool for calculation.
+    if (segments.size() <= 1) {
+        RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, 
specified_rowsets, delete_bitmap,
+                                           cur_version - 1, nullptr, 
rowset_writer.get()));
+
+    } else {
+        auto token = self->calc_delete_bitmap_executor()->create_token();
+        RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, 
specified_rowsets, delete_bitmap,
+                                           cur_version - 1, token.get(), 
rowset_writer.get()));
+        RETURN_IF_ERROR(token->wait());
+    }
+
+    std::stringstream ss;
+    if (watch.get_elapse_time_us() < 1 * 1000 * 1000) {
+        ss << "cost: " << watch.get_elapse_time_us() - t3 << "(us)";
+    } else {
+        ss << "cost(us): (load segments: " << t1 << ", get all rsid: " << t2 - 
t1
+           << ", get rowsets: " << t3 - t2
+           << ", calc delete bitmap: " << watch.get_elapse_time_us() - t3 << 
")";
+    }
+
+    size_t total_rows = std::accumulate(
+            segments.begin(), segments.end(), 0,
+            [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
+    LOG(INFO) << "[Publish] construct delete bitmap tablet: " << 
self->tablet_id()
+              << ", rowset_ids to add: " << rowset_ids_to_add.size()
+              << ", rowset_ids to del: " << rowset_ids_to_del.size()
+              << ", cur version: " << cur_version << ", transaction_id: " << 
txn_id << ","
+              << ss.str() << " , total rows: " << total_rows;
+
+    if (config::enable_merge_on_write_correctness_check && rowset->num_rows() 
!= 0) {
+        // only do correctness check if the rowset has at least one row written
+        // check if all the rowset has ROWSET_SENTINEL_MARK
+        auto st = self->check_delete_bitmap_correctness(delete_bitmap, 
cur_version - 1, -1,
+                                                        cur_rowset_ids, 
&specified_rowsets);
+        if (!st.ok()) {
+            LOG(WARNING) << fmt::format("delete bitmap correctness check 
failed in publish phase!");
+        }
+        self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap);
+    }
+
+    if (txn_info->partial_update_info && 
txn_info->partial_update_info->is_partial_update) {
+        
DBUG_EXECUTE_IF("Tablet.update_delete_bitmap.partial_update_write_rowset_fail", 
{
+            if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
+                
LOG_WARNING("Tablet.update_delete_bitmap.partial_update_write_rowset random 
failed")
+                        .tag("txn_id", txn_id);
+                return Status::InternalError(
+                        "debug update_delete_bitmap partial update write 
rowset random failed");
+            }
+        });
+        // build rowset writer and merge transient rowset
+        RETURN_IF_ERROR(rowset_writer->flush());
+        RowsetSharedPtr transient_rowset;
+        RETURN_IF_ERROR(rowset_writer->build(transient_rowset));
+        rowset->merge_rowset_meta(transient_rowset->rowset_meta());
+
+        // erase segment cache cause we will add a segment to rowset
+        SegmentLoader::instance()->erase_segments(rowset->rowset_id(), 
rowset->num_segments());
+    }
+
+    RETURN_IF_ERROR(self->save_delete_bitmap(txn_info, txn_id, delete_bitmap, 
rowset_writer.get(),
+                                             cur_rowset_ids));
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 943c257de64..837ff28d461 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -22,6 +22,7 @@
 #include <string>
 
 #include "common/status.h"
+#include "olap/partial_update_info.h"
 #include "olap/rowset/segment_v2/segment.h"
 #include "olap/tablet_fwd.h"
 #include "olap/tablet_meta.h"
@@ -178,6 +179,10 @@ public:
     static void add_sentinel_mark_to_delete_bitmap(DeleteBitmap* delete_bitmap,
                                                    const RowsetIdUnorderedSet& 
rowsetids);
 
+    Status check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, 
int64_t max_version,
+                                           int64_t txn_id, const 
RowsetIdUnorderedSet& rowset_ids,
+                                           std::vector<RowsetSharedPtr>* 
rowsets = nullptr);
+
     static Status generate_new_block_for_partial_update(
             TabletSchemaSPtr rowset_schema, const std::vector<uint32>& 
missing_cids,
             const std::vector<uint32>& update_cids, const 
PartialUpdateReadPlan& read_plan_ori,
@@ -198,6 +203,18 @@ public:
                                         const std::vector<uint32_t>& rowids,
                                         const TabletColumn& tablet_column,
                                         vectorized::MutableColumnPtr& dst);
+
+    virtual Result<std::unique_ptr<RowsetWriter>> 
create_transient_rowset_writer(
+            const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> 
partial_update_info,
+            int64_t txn_expiration = 0) = 0;
+
+    static Status update_delete_bitmap(const BaseTabletSPtr& self, const 
TabletTxnInfo* txn_info,
+                                       int64_t txn_id, int64_t txn_expiration 
= 0);
+
+    virtual Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t 
txn_id,
+                                      DeleteBitmapPtr delete_bitmap, 
RowsetWriter* rowset_writer,
+                                      const RowsetIdUnorderedSet& 
cur_rowset_ids) = 0;
+    virtual CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() = 0;
     
////////////////////////////////////////////////////////////////////////////
     // end MoW functions
     
////////////////////////////////////////////////////////////////////////////
@@ -216,6 +233,7 @@ protected:
     static void _rowset_ids_difference(const RowsetIdUnorderedSet& cur,
                                        const RowsetIdUnorderedSet& pre,
                                        RowsetIdUnorderedSet* to_add, 
RowsetIdUnorderedSet* to_del);
+    static void _remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr 
delete_bitmap);
 
     Status _capture_consistent_rowsets_unlocked(const std::vector<Version>& 
version_path,
                                                 std::vector<RowsetSharedPtr>* 
rowsets) const;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 8556e19483f..f58daf6816a 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -166,7 +166,7 @@ Status BaseBetaRowsetWriter::add_block(const 
vectorized::Block* block) {
     return _segment_creator.add_block(block);
 }
 
-Status BetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
+Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) {
     SCOPED_RAW_TIMER(&_delete_bitmap_ns);
     if (!_context.tablet->enable_unique_key_merge_on_write() ||
         (_context.partial_update_info && 
_context.partial_update_info->is_partial_update)) {
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index 3681e0fe120..cffb951451e 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -125,12 +125,11 @@ public:
     }
 
 private:
-    virtual Status _generate_delete_bitmap(int32_t segment_id) = 0;
-
     void update_rowset_schema(TabletSchemaSPtr flush_schema);
     // build a tmp rowset for load segment to calc delete_bitmap
     // for this segment
 protected:
+    Status _generate_delete_bitmap(int32_t segment_id);
     Status _build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num 
= false);
     Status _create_file_writer(std::string path, io::FileWriterPtr& 
file_writer);
     virtual Status _close_file_writers();
@@ -193,8 +192,6 @@ public:
             KeyBoundsPB& key_bounds);
 
 private:
-    Status _generate_delete_bitmap(int32_t segment_id) override;
-
     // segment compaction
     friend class SegcompactionWorker;
     Status _close_file_writers() override;
diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp
index bc946b3e06e..781510f2a21 100644
--- a/be/src/olap/rowset/rowset.cpp
+++ b/be/src/olap/rowset/rowset.cpp
@@ -75,6 +75,10 @@ void Rowset::make_visible(Version version) {
     }
 }
 
+void Rowset::set_version(Version version) {
+    _rowset_meta->set_version(version);
+}
+
 bool Rowset::check_rowset_segment() {
     std::lock_guard load_lock(_lock);
     return check_current_rowset_segment();
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index d9da9907405..b95aad700ae 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -138,6 +138,7 @@ public:
 
     // publish rowset to make it visible to read
     void make_visible(Version version);
+    void set_version(Version version);
     const TabletSchemaSPtr& tablet_schema() const { return _schema; }
 
     // helper class to access RowsetMeta
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 0dfaf4b7466..235a698021b 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -344,11 +344,6 @@ void 
SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) {
 // 3. set columns to data convertor and then write all columns
 Status SegmentWriter::append_block_with_partial_content(const 
vectorized::Block* block,
                                                         size_t row_pos, size_t 
num_rows) {
-    if (config::is_cloud_mode()) {
-        // TODO(plat1ko): cloud mode
-        return Status::NotSupported("append_block_with_partial_content");
-    }
-
     auto* tablet = static_cast<Tablet*>(_tablet.get());
     if (block->columns() <= _tablet_schema->num_key_columns() ||
         block->columns() >= _tablet_schema->num_columns()) {
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
index 1e3ff116783..16d5e60b09a 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
@@ -290,11 +290,6 @@ void 
VerticalSegmentWriter::_serialize_block_to_row_column(vectorized::Block& bl
 //       2.3 fill block
 // 3. set columns to data convertor and then write all columns
 Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& 
data) {
-    if (config::is_cloud_mode()) {
-        // TODO(plat1ko): CloudStorageEngine
-        return Status::NotSupported("append_block_with_partial_content");
-    }
-
     DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && 
_opts.enable_unique_key_merge_on_write);
     DCHECK(_opts.rowset_ctx->partial_update_info != nullptr);
 
@@ -527,10 +522,6 @@ Status VerticalSegmentWriter::_fill_missing_columns(
         vectorized::MutableColumns& mutable_full_columns,
         const std::vector<bool>& use_default_or_null_flag, bool 
has_default_or_nullable,
         const size_t& segment_start_pos) {
-    if (config::is_cloud_mode()) {
-        // TODO(plat1ko): CloudStorageEngine
-        return Status::NotSupported("fill_missing_columns");
-    }
     auto tablet = static_cast<Tablet*>(_tablet.get());
     // create old value columns
     const auto& missing_cids = 
_opts.rowset_ctx->partial_update_info->missing_cids;
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index 62263b8ae3c..6a6da21ac62 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -119,7 +119,7 @@ void RowsetBuilder::_garbage_collection() {
     }
 }
 
-Status RowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& 
mow_context) {
+Status BaseRowsetBuilder::init_mow_context(std::shared_ptr<MowContext>& 
mow_context) {
     std::lock_guard<std::shared_mutex> lck(tablet()->get_header_lock());
     int64_t cur_max_version = tablet()->max_version_unlocked();
     // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
diff --git a/be/src/olap/rowset_builder.h b/be/src/olap/rowset_builder.h
index 5563ece1b38..e54faee3435 100644
--- a/be/src/olap/rowset_builder.h
+++ b/be/src/olap/rowset_builder.h
@@ -82,6 +82,8 @@ public:
         return _partial_update_info;
     }
 
+    Status init_mow_context(std::shared_ptr<MowContext>& mow_context);
+
 protected:
     void _build_current_tablet_schema(int64_t index_id,
                                       const OlapTableSchemaParam* 
table_schema_param,
@@ -133,8 +135,6 @@ private:
 
     void _garbage_collection();
 
-    Status init_mow_context(std::shared_ptr<MowContext>& mow_context);
-
     // Cast `BaseTablet` to `Tablet`
     Tablet* tablet();
     TabletSharedPtr tablet_sptr();
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index efe63591e44..7f72af7f86c 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -146,8 +146,6 @@ using io::FileSystemSPtr;
 
 namespace {
 
-bvar::LatencyRecorder g_tablet_update_delete_bitmap_latency("doris_pk", 
"update_delete_bitmap");
-
 bvar::Adder<uint64_t> exceed_version_limit_counter;
 bvar::Window<bvar::Adder<uint64_t>> exceed_version_limit_counter_minute(
         &exceed_version_limit_counter, 60);
@@ -169,14 +167,6 @@ void set_last_failure_time(Tablet* tablet, const 
Compaction& compaction, int64_t
     }
 };
 
-// load segment may do io so it should out lock
-Status _load_rowset_segments(const RowsetSharedPtr& rowset,
-                             std::vector<segment_v2::SegmentSharedPtr>* 
segments) {
-    auto* beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
-    RETURN_IF_ERROR(beta_rowset->load_segments(segments));
-    return Status::OK();
-}
-
 } // namespace
 
 struct WriteCooldownMetaExecutors {
@@ -1757,7 +1747,8 @@ Result<std::unique_ptr<RowsetWriter>> 
Tablet::create_rowset_writer(RowsetWriterC
 // create a rowset writer with rowset_id and seg_id
 // after writer, merge this transient rowset with original rowset
 Result<std::unique_ptr<RowsetWriter>> Tablet::create_transient_rowset_writer(
-        const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> 
partial_update_info) {
+        const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> 
partial_update_info,
+        int64_t txn_expiration) {
     RowsetWriterContext context;
     context.rowset_state = PREPARED;
     context.segments_overlap = OVERLAPPING;
@@ -2322,7 +2313,7 @@ Status Tablet::update_delete_bitmap_without_lock(const 
TabletSharedPtr& self,
     });
     int64_t cur_version = rowset->end_version();
     std::vector<segment_v2::SegmentSharedPtr> segments;
-    RETURN_IF_ERROR(_load_rowset_segments(rowset, &segments));
+    
RETURN_IF_ERROR(std::dynamic_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
 
     // If this rowset does not have a segment, there is no need for an update.
     if (segments.empty()) {
@@ -2365,117 +2356,22 @@ Status Tablet::update_delete_bitmap_without_lock(const 
TabletSharedPtr& self,
     return Status::OK();
 }
 
-Status Tablet::update_delete_bitmap(const TabletSharedPtr& self, const 
TabletTxnInfo* txn_info,
-                                    int64_t txn_id) {
-    SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency);
-    RowsetIdUnorderedSet cur_rowset_ids;
-    RowsetIdUnorderedSet rowset_ids_to_add;
-    RowsetIdUnorderedSet rowset_ids_to_del;
+CalcDeleteBitmapExecutor* Tablet::calc_delete_bitmap_executor() {
+    return _engine.calc_delete_bitmap_executor();
+}
+
+Status Tablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t 
txn_id,
+                                  DeleteBitmapPtr delete_bitmap, RowsetWriter* 
rowset_writer,
+                                  const RowsetIdUnorderedSet& cur_rowset_ids) {
     RowsetSharedPtr rowset = txn_info->rowset;
     int64_t cur_version = rowset->start_version();
 
-    auto rowset_writer =
-            DORIS_TRY(self->create_transient_rowset_writer(*rowset, 
txn_info->partial_update_info));
-
-    DeleteBitmapPtr delete_bitmap = txn_info->delete_bitmap;
-    // Partial update might generate new segments when there is conflicts 
while publish, and mark
-    // the same key in original segments as delete.
-    // When the new segment flush fails or the rowset build fails, the 
deletion marker for the
-    // duplicate key of the original segment should not remain in 
`txn_info->delete_bitmap`,
-    // so we need to make a copy of `txn_info->delete_bitmap` and make changes 
on it.
-    if (txn_info->partial_update_info && 
txn_info->partial_update_info->is_partial_update) {
-        delete_bitmap = 
std::make_shared<DeleteBitmap>(*(txn_info->delete_bitmap));
-    }
-
-    OlapStopWatch watch;
-    std::vector<segment_v2::SegmentSharedPtr> segments;
-    RETURN_IF_ERROR(_load_rowset_segments(rowset, &segments));
-    auto t1 = watch.get_elapse_time_us();
-
-    {
-        std::shared_lock meta_rlock(self->_meta_lock);
-        // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-        if (self->tablet_state() == TABLET_NOTREADY) {
-            LOG(INFO) << "tablet is under alter process, update delete bitmap 
later, tablet_id="
-                      << self->tablet_id();
-            return Status::OK();
-        }
-        RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1, 
&cur_rowset_ids));
-    }
-    auto t2 = watch.get_elapse_time_us();
-
-    _rowset_ids_difference(cur_rowset_ids, txn_info->rowset_ids, 
&rowset_ids_to_add,
-                           &rowset_ids_to_del);
-    for (const auto& to_del : rowset_ids_to_del) {
-        delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
-    }
-
-    std::vector<RowsetSharedPtr> specified_rowsets;
-    {
-        std::shared_lock meta_rlock(self->_meta_lock);
-        specified_rowsets = self->get_rowset_by_ids(&rowset_ids_to_add);
-    }
-    auto t3 = watch.get_elapse_time_us();
-
-    auto token = self->_engine.calc_delete_bitmap_executor()->create_token();
-    RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments, 
specified_rowsets, delete_bitmap,
-                                       cur_version - 1, token.get(), 
rowset_writer.get()));
-    RETURN_IF_ERROR(token->wait());
-
-    std::stringstream ss;
-    if (watch.get_elapse_time_us() < 1 * 1000 * 1000) {
-        ss << "cost: " << watch.get_elapse_time_us() - t3 << "(us)";
-    } else {
-        ss << "cost(us): (load segments: " << t1 << ", get all rsid: " << t2 - 
t1
-           << ", get rowsets: " << t3 - t2
-           << ", calc delete bitmap: " << watch.get_elapse_time_us() - t3 << 
")";
-    }
-
-    size_t total_rows = std::accumulate(
-            segments.begin(), segments.end(), 0,
-            [](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum 
+= s->num_rows(); });
-    LOG(INFO) << "[Publish] construct delete bitmap tablet: " << 
self->tablet_id()
-              << ", rowset_ids to add: " << rowset_ids_to_add.size()
-              << ", rowset_ids to del: " << rowset_ids_to_del.size()
-              << ", cur version: " << cur_version << ", transaction_id: " << 
txn_id << ","
-              << ss.str() << " , total rows: " << total_rows;
-
-    if (config::enable_merge_on_write_correctness_check && rowset->num_rows() 
!= 0) {
-        // only do correctness check if the rowset has at least one row written
-        // check if all the rowset has ROWSET_SENTINEL_MARK
-        auto st = self->check_delete_bitmap_correctness(delete_bitmap, 
cur_version - 1, -1,
-                                                        cur_rowset_ids, 
&specified_rowsets);
-        if (!st.ok()) {
-            LOG(WARNING) << fmt::format("delete bitmap correctness check 
failed in publish phase!");
-        }
-        self->_remove_sentinel_mark_from_delete_bitmap(delete_bitmap);
-    }
-
-    if (txn_info->partial_update_info && 
txn_info->partial_update_info->is_partial_update) {
-        
DBUG_EXECUTE_IF("Tablet.update_delete_bitmap.partial_update_write_rowset_fail", 
{
-            if (rand() % 100 < (100 * dp->param("percent", 0.5))) {
-                
LOG_WARNING("Tablet.update_delete_bitmap.partial_update_write_rowset random 
failed")
-                        .tag("txn_id", txn_id);
-                return Status::InternalError(
-                        "debug update_delete_bitmap partial update write 
rowset random failed");
-            }
-        });
-        // build rowset writer and merge transient rowset
-        RETURN_IF_ERROR(rowset_writer->flush());
-        RowsetSharedPtr transient_rowset;
-        RETURN_IF_ERROR(rowset_writer->build(transient_rowset));
-        rowset->merge_rowset_meta(transient_rowset->rowset_meta());
-
-        // erase segment cache cause we will add a segment to rowset
-        SegmentLoader::instance()->erase_segments(rowset->rowset_id(), 
rowset->num_segments());
-    }
-
     // update version without write lock, compaction and publish_txn
     // will update delete bitmap, handle compaction with _rowset_update_lock
     // and publish_txn runs sequential so no need to lock here
     for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
-        self->_tablet_meta->delete_bitmap().merge({std::get<0>(key), 
std::get<1>(key), cur_version},
-                                                  bitmap);
+        _tablet_meta->delete_bitmap().merge({std::get<0>(key), 
std::get<1>(key), cur_version},
+                                            bitmap);
     }
 
     return Status::OK();
@@ -2536,7 +2432,8 @@ Status Tablet::check_rowid_conversion(
         return Status::OK();
     }
     std::vector<segment_v2::SegmentSharedPtr> dst_segments;
-    RETURN_IF_ERROR(_load_rowset_segments(dst_rowset, &dst_segments));
+    RETURN_IF_ERROR(
+            
std::dynamic_pointer_cast<BetaRowset>(dst_rowset)->load_segments(&dst_segments));
     std::unordered_map<RowsetId, std::vector<segment_v2::SegmentSharedPtr>, 
HashOfRowsetId>
             input_rowsets_segment;
 
@@ -2545,7 +2442,8 @@ Status Tablet::check_rowid_conversion(
         std::vector<segment_v2::SegmentSharedPtr>& segments =
                 input_rowsets_segment[src_rowset->rowset_id()];
         if (segments.empty()) {
-            RETURN_IF_ERROR(_load_rowset_segments(src_rowset, &segments));
+            RETURN_IF_ERROR(
+                    
std::dynamic_pointer_cast<BetaRowset>(src_rowset)->load_segments(&segments));
         }
         for (auto& [src, dst] : locations) {
             std::string src_key;
@@ -2760,81 +2658,4 @@ Status Tablet::ingest_binlog_metas(RowsetBinlogMetasPB* 
metas_pb) {
     return RowsetMetaManager::ingest_binlog_metas(_data_dir->get_meta(), 
tablet_uid(), metas_pb);
 }
 
-void Tablet::_remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr 
delete_bitmap) {
-    for (auto it = delete_bitmap->delete_bitmap.begin(), end = 
delete_bitmap->delete_bitmap.end();
-         it != end;) {
-        if (std::get<1>(it->first) == DeleteBitmap::INVALID_SEGMENT_ID) {
-            it = delete_bitmap->delete_bitmap.erase(it);
-        } else {
-            ++it;
-        }
-    }
-}
-
-Status Tablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, 
int64_t max_version,
-                                               int64_t txn_id,
-                                               const RowsetIdUnorderedSet& 
rowset_ids,
-                                               std::vector<RowsetSharedPtr>* 
rowsets) {
-    RowsetIdUnorderedSet missing_ids;
-    for (const auto& rowsetid : rowset_ids) {
-        if (!delete_bitmap->delete_bitmap.contains({rowsetid, 
DeleteBitmap::INVALID_SEGMENT_ID,
-                                                    
DeleteBitmap::TEMP_VERSION_COMMON})) {
-            missing_ids.insert(rowsetid);
-        }
-    }
-
-    if (!missing_ids.empty()) {
-        LOG(WARNING) << "[txn_id:" << txn_id << "][tablet_id:" << tablet_id()
-                     << "][max_version: " << max_version
-                     << "] check delete bitmap correctness failed!";
-        rapidjson::Document root;
-        root.SetObject();
-        rapidjson::Document required_rowsets_arr;
-        required_rowsets_arr.SetArray();
-        rapidjson::Document missing_rowsets_arr;
-        missing_rowsets_arr.SetArray();
-
-        if (rowsets != nullptr) {
-            for (const auto& rowset : *rowsets) {
-                rapidjson::Value value;
-                std::string version_str = rowset->get_rowset_info_str();
-                value.SetString(version_str.c_str(), version_str.length(),
-                                required_rowsets_arr.GetAllocator());
-                required_rowsets_arr.PushBack(value, 
required_rowsets_arr.GetAllocator());
-            }
-        } else {
-            std::vector<RowsetSharedPtr> rowsets;
-            {
-                std::shared_lock meta_rlock(_meta_lock);
-                rowsets = get_rowset_by_ids(&rowset_ids);
-            }
-            for (const auto& rowset : rowsets) {
-                rapidjson::Value value;
-                std::string version_str = rowset->get_rowset_info_str();
-                value.SetString(version_str.c_str(), version_str.length(),
-                                required_rowsets_arr.GetAllocator());
-                required_rowsets_arr.PushBack(value, 
required_rowsets_arr.GetAllocator());
-            }
-        }
-        for (const auto& missing_rowset_id : missing_ids) {
-            rapidjson::Value miss_value;
-            std::string rowset_id_str = missing_rowset_id.to_string();
-            miss_value.SetString(rowset_id_str.c_str(), rowset_id_str.length(),
-                                 missing_rowsets_arr.GetAllocator());
-            missing_rowsets_arr.PushBack(miss_value, 
missing_rowsets_arr.GetAllocator());
-        }
-
-        root.AddMember("required_rowsets", required_rowsets_arr, 
root.GetAllocator());
-        root.AddMember("missing_rowsets", missing_rowsets_arr, 
root.GetAllocator());
-        rapidjson::StringBuffer strbuf;
-        rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
-        root.Accept(writer);
-        std::string rowset_status_string = std::string(strbuf.GetString());
-        LOG_EVERY_SECOND(WARNING) << rowset_status_string;
-        // let it crash if correctness check failed in Debug mode
-        DCHECK(false) << "delete bitmap correctness check failed in publish 
phase!";
-        return Status::InternalError("check delete bitmap failed!");
-    }
-    return Status::OK();
-}
 } // namespace doris
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index aea9dade310..873f22bff74 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -70,7 +70,6 @@ class TupleDescriptor;
 class CalcDeleteBitmapToken;
 enum CompressKind : int;
 class RowsetBinlogMetasPB;
-struct TabletTxnInfo;
 
 namespace io {
 class RemoteFileSystem;
@@ -306,7 +305,8 @@ public:
                                                                bool vertical) 
override;
 
     Result<std::unique_ptr<RowsetWriter>> create_transient_rowset_writer(
-            const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> 
partial_update_info);
+            const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> 
partial_update_info,
+            int64_t txn_expiration = 0) override;
     Result<std::unique_ptr<RowsetWriter>> create_transient_rowset_writer(
             RowsetWriterContext& context, const RowsetId& rowset_id);
 
@@ -370,12 +370,14 @@ public:
     // end cooldown functions
     
////////////////////////////////////////////////////////////////////////////
 
-    static Status update_delete_bitmap(const TabletSharedPtr& self, const 
TabletTxnInfo* txn_info,
-                                       int64_t txn_id);
-
     static Status update_delete_bitmap_without_lock(const TabletSharedPtr& 
self,
                                                     const RowsetSharedPtr& 
rowset);
 
+    CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() override;
+    Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
+                              DeleteBitmapPtr delete_bitmap, RowsetWriter* 
rowset_writer,
+                              const RowsetIdUnorderedSet& cur_rowset_ids) 
override;
+
     void calc_compaction_output_rowset_delete_bitmap(
             const std::vector<RowsetSharedPtr>& input_rowsets,
             const RowIdConversion& rowid_conversion, uint64_t start_version, 
uint64_t end_version,
@@ -448,9 +450,6 @@ public:
 
     void set_binlog_config(BinlogConfig binlog_config);
 
-    Status check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, 
int64_t max_version,
-                                           int64_t txn_id, const 
RowsetIdUnorderedSet& rowset_ids,
-                                           std::vector<RowsetSharedPtr>* 
rowsets = nullptr);
     void set_alter_failed(bool alter_failed) { _alter_failed = alter_failed; }
     bool is_alter_failed() { return _alter_failed; }
 
@@ -488,8 +487,6 @@ private:
     // end cooldown functions
     
////////////////////////////////////////////////////////////////////////////
 
-    void _remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr 
delete_bitmap);
-
 public:
     static const int64_t K_INVALID_CUMULATIVE_POINT = -1;
 
diff --git a/be/src/olap/tablet_fwd.h b/be/src/olap/tablet_fwd.h
index c54164c57cb..3583a286643 100644
--- a/be/src/olap/tablet_fwd.h
+++ b/be/src/olap/tablet_fwd.h
@@ -26,6 +26,8 @@ class Tablet;
 class TabletSchema;
 class TabletMeta;
 class DeleteBitmap;
+class CalcDeleteBitmapExecutor;
+struct TabletTxnInfo;
 
 using BaseTabletSPtr = std::shared_ptr<BaseTablet>;
 using TabletSharedPtr = std::shared_ptr<Tablet>;
diff --git a/be/src/runtime/memory/cache_policy.h 
b/be/src/runtime/memory/cache_policy.h
index 6de3de600d8..538b30099f4 100644
--- a/be/src/runtime/memory/cache_policy.h
+++ b/be/src/runtime/memory/cache_policy.h
@@ -44,6 +44,7 @@ public:
         TABLET_SCHEMA_CACHE = 14,
         CREATE_TABLET_RR_IDX_CACHE = 15,
         CLOUD_TABLET_CACHE = 16,
+        CLOUD_TXN_DELETE_BITMAP_CACHE = 17,
     };
 
     static std::string type_string(CacheType type) {
@@ -82,6 +83,8 @@ public:
             return "CreateTabletRRIdxCache";
         case CacheType::CLOUD_TABLET_CACHE:
             return "CloudTabletCache";
+        case CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE:
+            return "CloudTxnDeleteBitmapCache";
         default:
             LOG(FATAL) << "not match type of cache policy :" << 
static_cast<int>(type);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index d201e065804..00ca450ff2e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -280,11 +280,6 @@ public class CreateTableStmt extends DdlStmt {
 
     @Override
     public void analyze(Analyzer analyzer) throws UserException {
-        if (Config.isCloudMode() && properties != null
-                && 
properties.containsKey(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE)) {
-            // FIXME: MOW is not supported in cloud mode yet.
-            properties.put(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE, 
"false");
-        }
         if (Strings.isNullOrEmpty(engineName) || 
engineName.equalsIgnoreCase(DEFAULT_ENGINE_NAME)) {
             this.properties = maybeRewriteByAutoBucket(distributionDesc, 
properties);
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
index 825e55ee09e..0c73a19c7d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java
@@ -57,6 +57,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 
 /**
@@ -82,7 +83,10 @@ public abstract class Table extends MetaObject implements 
Writable, TableIf {
     @SerializedName(value = "createTime")
     protected long createTime;
     protected QueryableReentrantReadWriteLock rwLock;
-
+    // Used for queuing commit transaction tasks to avoid fdb transaction 
conflicts,
+    // especially to reduce conflicts when obtaining delete bitmap update 
locks for
+    // MoW table
+    protected ReentrantLock commitLock;
     /*
      *  fullSchema and nameToColumn should contains all columns, both visible 
and shadow.
      *  eg. for OlapTable, when doing schema change, there will be some shadow 
columns which are not visible
@@ -131,6 +135,7 @@ public abstract class Table extends MetaObject implements 
Writable, TableIf {
         if (Config.check_table_lock_leaky) {
             this.readLockThreads = Maps.newConcurrentMap();
         }
+        this.commitLock = new ReentrantLock(true);
     }
 
     public Table(long id, String tableName, TableType type, List<Column> 
fullSchema) {
@@ -155,6 +160,7 @@ public abstract class Table extends MetaObject implements 
Writable, TableIf {
         if (Config.check_table_lock_leaky) {
             this.readLockThreads = Maps.newConcurrentMap();
         }
+        this.commitLock = new ReentrantLock(true);
     }
 
     public void markDropped() {
@@ -297,6 +303,28 @@ public abstract class Table extends MetaObject implements 
Writable, TableIf {
         return false;
     }
 
+    public void commitLock() {
+        this.commitLock.lock();
+    }
+
+    public boolean tryCommitLock(long timeout, TimeUnit unit) {
+        try {
+            boolean res = this.commitLock.tryLock(timeout, unit);
+            if (!res && unit.toSeconds(timeout) >= 1) {
+                LOG.warn("Failed to try table {}'s cloud commit lock. timeout 
{} {}. Current owner: {}",
+                        name, timeout, unit.name(), rwLock.getOwner());
+            }
+            return res;
+        } catch (InterruptedException e) {
+            LOG.warn("failed to try cloud commit lock at table[" + name + "]", 
e);
+            return false;
+        }
+    }
+
+    public void commitUnlock() {
+        this.commitLock.unlock();
+    }
+
     public boolean isTypeRead() {
         return isTypeRead;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 56c3aed2833..6be3875fef6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -20,7 +20,11 @@ package org.apache.doris.cloud.transaction;
 import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TabletInvertedIndex;
+import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.cloud.proto.Cloud.AbortTxnRequest;
 import org.apache.doris.cloud.proto.Cloud.AbortTxnResponse;
 import org.apache.doris.cloud.proto.Cloud.BeginTxnRequest;
@@ -33,6 +37,8 @@ import org.apache.doris.cloud.proto.Cloud.CommitTxnRequest;
 import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse;
 import org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnRequest;
 import org.apache.doris.cloud.proto.Cloud.GetCurrentMaxTxnResponse;
+import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockRequest;
+import org.apache.doris.cloud.proto.Cloud.GetDeleteBitmapUpdateLockResponse;
 import org.apache.doris.cloud.proto.Cloud.GetTxnRequest;
 import org.apache.doris.cloud.proto.Cloud.GetTxnResponse;
 import org.apache.doris.cloud.proto.Cloud.LoadJobSourceTypePB;
@@ -46,12 +52,15 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.FeNameFormat;
+import org.apache.doris.common.InternalErrorCode;
 import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.QuotaExceedException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.common.util.InternalDatabaseUtil;
+import org.apache.doris.common.util.MetaLockUtils;
 import org.apache.doris.load.loadv2.LoadJobFinalOperation;
 import org.apache.doris.load.routineload.RLTaskTxnCommitAttachment;
 import org.apache.doris.metric.MetricRepo;
@@ -60,7 +69,14 @@ import 
org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
 import org.apache.doris.persist.EditLog;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.rpc.RpcException;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.CalcDeleteBitmapTask;
+import org.apache.doris.thrift.TCalcDeleteBitmapPartitionInfo;
 import org.apache.doris.thrift.TStatus;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TTaskType;
 import org.apache.doris.thrift.TUniqueId;
 import org.apache.doris.thrift.TWaitingTxnStatusRequest;
 import org.apache.doris.thrift.TWaitingTxnStatusResult;
@@ -77,8 +93,14 @@ import org.apache.doris.transaction.TxnCommitAttachment;
 import org.apache.doris.transaction.TxnStateCallbackFactory;
 import org.apache.doris.transaction.TxnStateChangeCallback;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.StopWatch;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -88,11 +110,17 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 public class CloudGlobalTransactionMgr implements GlobalTransactionMgrIface {
     private static final Logger LOG = 
LogManager.getLogger(CloudGlobalTransactionMgr.class);
     private static final String NOT_SUPPORTED_MSG = "Not supported in cloud 
mode";
+    private static final int DELETE_BITMAP_LOCK_EXPIRATION_SECONDS = 10;
+    private static final int CALCULATE_DELETE_BITMAP_TASK_TIMEOUT_SECONDS = 15;
 
     private TxnStateCallbackFactory callbackFactory;
 
@@ -295,6 +323,10 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                     "disable_load_job is set to true, all load jobs are not 
allowed");
         }
 
+        List<OlapTable> mowTableList = getMowTableList(tableList);
+        if (tabletCommitInfos != null && !tabletCommitInfos.isEmpty() && 
!mowTableList.isEmpty()) {
+            calcDeleteBitmapForMow(dbId, mowTableList, transactionId, 
tabletCommitInfos);
+        }
 
         CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder();
         builder.setDbId(dbId)
@@ -302,6 +334,13 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                 .setIs2Pc(is2PC)
                 .setCloudUniqueId(Config.cloud_unique_id);
 
+        // if tablet commit info is empty, no need to pass mowTableList to 
meta service.
+        if (tabletCommitInfos != null && !tabletCommitInfos.isEmpty()) {
+            for (OlapTable olapTable : mowTableList) {
+                builder.addMowTableIds(olapTable.getId());
+            }
+        }
+
         if (txnCommitAttachment != null) {
             if (txnCommitAttachment instanceof LoadJobFinalOperation) {
                 LoadJobFinalOperation loadJobFinalOperation = 
(LoadJobFinalOperation) txnCommitAttachment;
@@ -350,6 +389,11 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
                 && commitTxnResponse.getStatus().getCode() != 
MetaServiceCode.TXN_ALREADY_VISIBLE) {
             LOG.warn("commitTxn failed, transactionId:{}, retryTime:{}, 
commitTxnResponse:{}",
                     transactionId, retryTime, commitTxnResponse);
+            if (commitTxnResponse.getStatus().getCode() == 
MetaServiceCode.LOCK_EXPIRED) {
+                // DELETE_BITMAP_LOCK_ERR will be retried on be
+                throw new 
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
+                        "delete bitmap update lock expired, transactionId:" + 
transactionId);
+            }
             StringBuilder internalMsgBuilder =
                     new StringBuilder("commitTxn failed, transactionId:");
             internalMsgBuilder.append(transactionId);
@@ -372,6 +416,228 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
         }
     }
 
+    private List<OlapTable> getMowTableList(List<Table> tableList) {
+        List<OlapTable> mowTableList = new ArrayList<>();
+        for (Table table : tableList) {
+            if ((table instanceof OlapTable)) {
+                OlapTable olapTable = (OlapTable) table;
+                if (olapTable.getEnableUniqueKeyMergeOnWrite()) {
+                    mowTableList.add(olapTable);
+                }
+            }
+        }
+        return mowTableList;
+    }
+
+    private void calcDeleteBitmapForMow(long dbId, List<OlapTable> tableList, 
long transactionId,
+            List<TabletCommitInfo> tabletCommitInfos)
+            throws UserException {
+        Map<Long, Map<Long, List<Long>>> backendToPartitionTablets = 
Maps.newHashMap();
+        Map<Long, Partition> partitions = Maps.newHashMap();
+        Map<Long, Set<Long>> tableToPartitions = Maps.newHashMap();
+        getPartitionInfo(tableList, tabletCommitInfos, tableToPartitions, 
partitions, backendToPartitionTablets);
+        if (backendToPartitionTablets.isEmpty()) {
+            throw new UserException("The partition info is empty, table may be 
dropped, txnid=" + transactionId);
+        }
+
+        getDeleteBitmapUpdateLock(tableToPartitions, transactionId);
+        Map<Long, Long> partitionVersions = getPartitionVersions(partitions);
+
+        Map<Long, List<TCalcDeleteBitmapPartitionInfo>> 
backendToPartitionInfos = getCalcDeleteBitmapInfo(
+                backendToPartitionTablets, partitionVersions);
+        sendCalcDeleteBitmaptask(dbId, transactionId, backendToPartitionInfos);
+    }
+
+    private void getPartitionInfo(List<OlapTable> tableList,
+            List<TabletCommitInfo> tabletCommitInfos,
+            Map<Long, Set<Long>> tableToParttions,
+            Map<Long, Partition> partitions,
+            Map<Long, Map<Long, List<Long>>> backendToPartitionTablets) {
+        Map<Long, OlapTable> tableMap = Maps.newHashMap();
+        for (OlapTable olapTable : tableList) {
+            tableMap.put(olapTable.getId(), olapTable);
+        }
+
+        List<Long> tabletIds = tabletCommitInfos.stream()
+                
.map(TabletCommitInfo::getTabletId).collect(Collectors.toList());
+        TabletInvertedIndex tabletInvertedIndex = 
Env.getCurrentEnv().getTabletInvertedIndex();
+        List<TabletMeta> tabletMetaList = 
tabletInvertedIndex.getTabletMetaList(tabletIds);
+        for (int i = 0; i < tabletMetaList.size(); i++) {
+            TabletMeta tabletMeta = tabletMetaList.get(i);
+            long tableId = tabletMeta.getTableId();
+            if (!tableMap.containsKey(tableId)) {
+                continue;
+            }
+
+            long partitionId = tabletMeta.getPartitionId();
+            long backendId = tabletCommitInfos.get(i).getBackendId();
+
+            if (!tableToParttions.containsKey(tableId)) {
+                tableToParttions.put(tableId, Sets.newHashSet());
+            }
+            tableToParttions.get(tableId).add(partitionId);
+
+            if (!backendToPartitionTablets.containsKey(backendId)) {
+                backendToPartitionTablets.put(backendId, Maps.newHashMap());
+            }
+            Map<Long, List<Long>> partitionToTablets = 
backendToPartitionTablets.get(backendId);
+            if (!partitionToTablets.containsKey(partitionId)) {
+                partitionToTablets.put(partitionId, Lists.newArrayList());
+            }
+            partitionToTablets.get(partitionId).add(tabletIds.get(i));
+            partitions.putIfAbsent(partitionId, 
tableMap.get(tableId).getPartition(partitionId));
+        }
+    }
+
+    private Map<Long, Long> getPartitionVersions(Map<Long, Partition> 
partitionMap) {
+        Map<Long, Long> partitionToVersions = Maps.newHashMap();
+        partitionMap.forEach((key, value) -> {
+            long visibleVersion = value.getVisibleVersion();
+            long newVersion = visibleVersion <= 0 ? 2 : visibleVersion + 1;
+            partitionToVersions.put(key, newVersion);
+        });
+        return partitionToVersions;
+    }
+
+    private Map<Long, List<TCalcDeleteBitmapPartitionInfo>> 
getCalcDeleteBitmapInfo(
+            Map<Long, Map<Long, List<Long>>> backendToPartitionTablets, 
Map<Long, Long> partitionVersions) {
+        Map<Long, List<TCalcDeleteBitmapPartitionInfo>> 
backendToPartitionInfos = Maps.newHashMap();
+        for (Map.Entry<Long, Map<Long, List<Long>>> entry : 
backendToPartitionTablets.entrySet()) {
+            List<TCalcDeleteBitmapPartitionInfo> partitionInfos = 
Lists.newArrayList();
+            for (Map.Entry<Long, List<Long>> partitionToTables : 
entry.getValue().entrySet()) {
+                Long partitionId = partitionToTables.getKey();
+                TCalcDeleteBitmapPartitionInfo partitionInfo = new 
TCalcDeleteBitmapPartitionInfo(partitionId,
+                        partitionVersions.get(partitionId),
+                        partitionToTables.getValue());
+                partitionInfos.add(partitionInfo);
+            }
+            backendToPartitionInfos.put(entry.getKey(), partitionInfos);
+        }
+        return backendToPartitionInfos;
+    }
+
+    private void getDeleteBitmapUpdateLock(Map<Long, Set<Long>> 
tableToParttions, long transactionId)
+            throws UserException {
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+        for (Map.Entry<Long, Set<Long>> entry : tableToParttions.entrySet()) {
+            GetDeleteBitmapUpdateLockRequest.Builder builder = 
GetDeleteBitmapUpdateLockRequest.newBuilder();
+            builder.setTableId(entry.getKey())
+                    .setLockId(transactionId)
+                    .setInitiator(-1)
+                    .setExpiration(DELETE_BITMAP_LOCK_EXPIRATION_SECONDS);
+            final GetDeleteBitmapUpdateLockRequest request = builder.build();
+            GetDeleteBitmapUpdateLockResponse response = null;
+
+            int retryTime = 0;
+            while (retryTime++ < Config.meta_service_rpc_retry_times) {
+                try {
+                    response = 
MetaServiceProxy.getInstance().getDeleteBitmapUpdateLock(request);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("get delete bitmap lock, transactionId={}, 
Request: {}, Response: {}",
+                                transactionId, request, response);
+                    }
+                    if (response.getStatus().getCode() != 
MetaServiceCode.LOCK_CONFLICT
+                            && response.getStatus().getCode() != 
MetaServiceCode.KV_TXN_CONFLICT) {
+                        break;
+                    }
+                } catch (Exception e) {
+                    LOG.warn("ignore get delete bitmap lock exception, 
transactionId={}, retryTime={}",
+                            transactionId, retryTime, e);
+                }
+                // sleep random millis [20, 200] ms, avoid txn conflict
+                int randomMillis = 20 + (int) (Math.random() * (200 - 20));
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("randomMillis:{}", randomMillis);
+                }
+                try {
+                    Thread.sleep(randomMillis);
+                } catch (InterruptedException e) {
+                    LOG.info("InterruptedException: ", e);
+                }
+            }
+            Preconditions.checkNotNull(response);
+            Preconditions.checkNotNull(response.getStatus());
+            if (response.getStatus().getCode() != MetaServiceCode.OK) {
+                LOG.warn("get delete bitmap lock failed, transactionId={}, for 
{} times, response:{}",
+                        transactionId, retryTime, response);
+                if (response.getStatus().getCode() == 
MetaServiceCode.LOCK_CONFLICT
+                        || response.getStatus().getCode() == 
MetaServiceCode.KV_TXN_CONFLICT) {
+                    // DELETE_BITMAP_LOCK_ERR will be retried on be
+                    throw new 
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
+                            "Failed to get delete bitmap lock due to 
confilct");
+                }
+                throw new UserException("Failed to get delete bitmap lock, 
code: " + response.getStatus().getCode());
+            }
+        }
+        stopWatch.stop();
+        LOG.info("get delete bitmap lock successfully. txns: {}. time cost: {} 
ms.",
+                 transactionId, stopWatch.getTime());
+    }
+
+    private void sendCalcDeleteBitmaptask(long dbId, long transactionId,
+            Map<Long, List<TCalcDeleteBitmapPartitionInfo>> 
backendToPartitionInfos)
+            throws UserException {
+        if (backendToPartitionInfos.isEmpty()) {
+            return;
+        }
+        StopWatch stopWatch = new StopWatch();
+        stopWatch.start();
+        int totalTaskNum = backendToPartitionInfos.size();
+        MarkedCountDownLatch<Long, Long> countDownLatch = new 
MarkedCountDownLatch<Long, Long>(
+                totalTaskNum);
+        AgentBatchTask batchTask = new AgentBatchTask();
+        for (Map.Entry<Long, List<TCalcDeleteBitmapPartitionInfo>> entry : 
backendToPartitionInfos.entrySet()) {
+            CalcDeleteBitmapTask task = new 
CalcDeleteBitmapTask(entry.getKey(),
+                    transactionId,
+                    dbId,
+                    entry.getValue(),
+                    countDownLatch);
+            countDownLatch.addMark(entry.getKey(), transactionId);
+            // add to AgentTaskQueue for handling finish report.
+            // not check return value, because the add will success
+            AgentTaskQueue.addTask(task);
+            batchTask.addTask(task);
+        }
+        AgentTaskExecutor.submit(batchTask);
+
+        boolean ok;
+        try {
+            ok = 
countDownLatch.await(CALCULATE_DELETE_BITMAP_TASK_TIMEOUT_SECONDS, 
TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            LOG.warn("InterruptedException: ", e);
+            ok = false;
+        }
+
+        if (!ok || !countDownLatch.getStatus().ok()) {
+            String errMsg = "Failed to calculate delete bitmap.";
+            // clear tasks
+            AgentTaskQueue.removeBatchTask(batchTask, 
TTaskType.CALCULATE_DELETE_BITMAP);
+
+            if (!countDownLatch.getStatus().ok()) {
+                errMsg += countDownLatch.getStatus().getErrorMsg();
+                if (countDownLatch.getStatus().getErrorCode() != 
TStatusCode.DELETE_BITMAP_LOCK_ERROR) {
+                    throw new UserException(errMsg);
+                }
+            } else {
+                errMsg += " Timeout.";
+                List<Entry<Long, Long>> unfinishedMarks = 
countDownLatch.getLeftMarks();
+                // only show at most 3 results
+                List<Entry<Long, Long>> subList = unfinishedMarks.subList(0,
+                        Math.min(unfinishedMarks.size(), 3));
+                if (!subList.isEmpty()) {
+                    errMsg += " Unfinished mark: " + Joiner.on(", 
").join(subList);
+                }
+            }
+            LOG.warn(errMsg);
+            // DELETE_BITMAP_LOCK_ERR will be retried on be
+            throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR, 
errMsg);
+        }
+        stopWatch.stop();
+        LOG.info("calc delete bitmap task successfully. txns: {}. time cost: 
{} ms.",
+                transactionId, stopWatch.getTime());
+    }
+
     @Override
     public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> 
tableList, long transactionId,
                                                List<TabletCommitInfo> 
tabletCommitInfos, long timeoutMillis)
@@ -383,7 +649,17 @@ public class CloudGlobalTransactionMgr implements 
GlobalTransactionMgrIface {
     public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> 
tableList, long transactionId,
                                                List<TabletCommitInfo> 
tabletCommitInfos, long timeoutMillis,
                                                TxnCommitAttachment 
txnCommitAttachment) throws UserException {
-        commitTransaction(db.getId(), tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
+        if (!MetaLockUtils.tryCommitLockTables(tableList, timeoutMillis, 
TimeUnit.MILLISECONDS)) {
+            // DELETE_BITMAP_LOCK_ERR will be retried on be
+            throw new UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
+                    "get table cloud commit lock timeout, tableList=("
+                            + StringUtils.join(tableList, ",") + ")");
+        }
+        try {
+            commitTransaction(db.getId(), tableList, transactionId, 
tabletCommitInfos, txnCommitAttachment);
+        } finally {
+            MetaLockUtils.commitUnlockTables(tableList);
+        }
         return true;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
index 2bbd5c58efa..944f8e04860 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/InternalErrorCode.java
@@ -34,7 +34,10 @@ public enum InternalErrorCode {
     MANUAL_STOP_ERR(101),
     TOO_MANY_FAILURE_ROWS_ERR(102),
     CREATE_TASKS_ERR(103),
-    TASKS_ABORT_ERR(104);
+    TASKS_ABORT_ERR(104),
+
+    // for MoW table
+    DELETE_BITMAP_LOCK_ERR(301);
 
     private long errCode;
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
index ece62cbc10d..084c6f25f79 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/MetaLockUtils.java
@@ -18,6 +18,7 @@
 package org.apache.doris.common.util;
 
 import org.apache.doris.catalog.DatabaseIf;
+import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.MetaNotFoundException;
 
@@ -112,4 +113,27 @@ public class MetaLockUtils {
         }
     }
 
+    public static void commitLockTables(List<Table> tableList) {
+        for (Table table : tableList) {
+            table.commitLock();
+        }
+    }
+
+    public static void commitUnlockTables(List<Table> tableList) {
+        for (int i = tableList.size() - 1; i >= 0; i--) {
+            tableList.get(i).commitUnlock();
+        }
+    }
+
+    public static boolean tryCommitLockTables(List<Table> tableList, long 
timeout, TimeUnit unit) {
+        for (int i = 0; i < tableList.size(); i++) {
+            if (!tableList.get(i).tryCommitLock(timeout, unit)) {
+                for (int j = i - 1; j >= 0; j--) {
+                    tableList.get(j).commitUnlock();
+                }
+                return false;
+            }
+        }
+        return true;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 5625e9626de..9e07574e57a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -1439,7 +1439,7 @@ public class PropertyAnalyzer {
     // the user doesn't specify the property in 
`CreateTableStmt`/`CreateTableInfo`
     public static Map<String, String> 
enableUniqueKeyMergeOnWriteIfNotExists(Map<String, String> properties) {
         if (Config.isCloudMode()) {
-            // FIXME: MOW is not supported in cloud mode yet.
+            // the default value of enable_unique_key_merge_on_write is false 
for cloud mode yet.
             return properties;
         }
         if (properties != null && 
properties.get(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE) == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 485463d8daf..12f1600cd3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -37,6 +37,7 @@ import org.apache.doris.task.AgentTask;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.AlterInvertedIndexTask;
 import org.apache.doris.task.AlterReplicaTask;
+import org.apache.doris.task.CalcDeleteBitmapTask;
 import org.apache.doris.task.CheckConsistencyTask;
 import org.apache.doris.task.ClearAlterTask;
 import org.apache.doris.task.CloneTask;
@@ -137,7 +138,8 @@ public class MasterImpl {
                         && taskType != TTaskType.DOWNLOAD && taskType != 
TTaskType.MOVE
                         && taskType != TTaskType.CLONE && taskType != 
TTaskType.PUBLISH_VERSION
                         && taskType != TTaskType.CREATE && taskType != 
TTaskType.UPDATE_TABLET_META_INFO
-                        && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE) {
+                        && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE
+                        && taskType != TTaskType.CALCULATE_DELETE_BITMAP) {
                     return result;
                 }
             }
@@ -203,6 +205,9 @@ public class MasterImpl {
                 case PUSH_COOLDOWN_CONF:
                     finishPushCooldownConfTask(task);
                     break;
+                case CALCULATE_DELETE_BITMAP:
+                    finishCalcDeleteBitmap(task, request);
+                    break;
                 default:
                     break;
             }
@@ -628,4 +633,28 @@ public class MasterImpl {
         cooldownTask.setFinished(true);
         AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.PUSH_COOLDOWN_CONF, task.getSignature());
     }
+
+    private void finishCalcDeleteBitmap(AgentTask task, TFinishTaskRequest 
request) {
+        // if we get here, this task will be removed from AgentTaskQueue for 
certain.
+        // because in this function, the only problem that cause failure is 
meta missing.
+        // and if meta is missing, we no longer need to resend this task
+        try {
+            CalcDeleteBitmapTask calcDeleteBitmapTask = (CalcDeleteBitmapTask) 
task;
+            if (request.getTaskStatus().getStatusCode() != TStatusCode.OK) {
+                
calcDeleteBitmapTask.countDownToZero(request.getTaskStatus().getStatusCode(),
+                        "backend: " + task.getBackendId() + ", 
error_tablet_size: "
+                                + request.getErrorTabletIdsSize() + ", 
err_msg: "
+                                + 
request.getTaskStatus().getErrorMsgs().toString());
+            } else {
+                calcDeleteBitmapTask.countDownLatch(task.getBackendId(), 
calcDeleteBitmapTask.getTransactionId());
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("finish calc delete bitmap. transaction id: {}, 
be: {}, report version: {}",
+                            calcDeleteBitmapTask.getTransactionId(), 
calcDeleteBitmapTask.getBackendId(),
+                            request.getReportVersion());
+                }
+            }
+        } finally {
+            AgentTaskQueue.removeTask(task.getBackendId(), 
TTaskType.CALCULATE_DELETE_BITMAP, task.getSignature());
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
index 3620f2e0413..400db4fc65f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateTableInfo.java
@@ -225,12 +225,6 @@ public class CreateTableInfo {
             properties = Maps.newHashMap();
         }
 
-        if (Config.isCloudMode() && properties != null
-                && 
properties.containsKey(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE)) {
-            // FIXME: MOW is not supported in cloud mode yet.
-            properties.put(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE, 
"false");
-        }
-
         if (Strings.isNullOrEmpty(engineName) || 
engineName.equalsIgnoreCase("olap")) {
             if (distribution == null) {
                 throw new AnalysisException("Create olap table should contain 
distribution desc");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index 1f2e662c757..1de23992fed 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -26,6 +26,7 @@ import org.apache.doris.thrift.TAgentServiceVersion;
 import org.apache.doris.thrift.TAgentTaskRequest;
 import org.apache.doris.thrift.TAlterInvertedIndexReq;
 import org.apache.doris.thrift.TAlterTabletReqV2;
+import org.apache.doris.thrift.TCalcDeleteBitmapRequest;
 import org.apache.doris.thrift.TCheckConsistencyReq;
 import org.apache.doris.thrift.TClearAlterTaskRequest;
 import org.apache.doris.thrift.TClearTransactionTaskRequest;
@@ -392,6 +393,15 @@ public class AgentBatchTask implements Runnable {
                 tAgentTaskRequest.setGcBinlogReq(request);
                 return tAgentTaskRequest;
             }
+            case CALCULATE_DELETE_BITMAP: {
+                CalcDeleteBitmapTask calcDeleteBitmapTask = 
(CalcDeleteBitmapTask) task;
+                TCalcDeleteBitmapRequest request = 
calcDeleteBitmapTask.toThrift();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(request.toString());
+                }
+                tAgentTaskRequest.setCalcDeleteBitmapReq(request);
+                return tAgentTaskRequest;
+            }
             default:
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("could not find task type for task [{}]", task);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java
new file mode 100644
index 00000000000..4188cf61849
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CalcDeleteBitmapTask.java
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.common.MarkedCountDownLatch;
+import org.apache.doris.common.Status;
+import org.apache.doris.thrift.TCalcDeleteBitmapPartitionInfo;
+import org.apache.doris.thrift.TCalcDeleteBitmapRequest;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TTaskType;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CalcDeleteBitmapTask extends AgentTask  {
+    private static final Logger LOG = 
LogManager.getLogger(CreateReplicaTask.class);
+    // used for synchronous process
+    private MarkedCountDownLatch<Long, Long> latch;
+    private long transactionId;
+    private List<TCalcDeleteBitmapPartitionInfo> partitionInfos;
+    private List<Long> errorTablets;
+
+    public CalcDeleteBitmapTask(long backendId, long transactionId, long dbId,
+            List<TCalcDeleteBitmapPartitionInfo> partitionInfos,
+            MarkedCountDownLatch<Long, Long> latch) {
+        super(null, backendId, TTaskType.CALCULATE_DELETE_BITMAP, dbId, -1L, 
-1L, -1L, -1L, transactionId);
+        this.transactionId = transactionId;
+        this.partitionInfos = partitionInfos;
+        this.errorTablets = new ArrayList<Long>();
+        this.isFinished = false;
+        this.latch = latch;
+    }
+
+    public void countDownLatch(long backendId, long transactionId) {
+        if (this.latch != null) {
+            if (latch.markedCountDown(backendId, transactionId)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("CalcDeleteBitmapTask current latch count: {}, 
backend: {}, transactionId:{}",
+                            latch.getCount(), backendId, transactionId);
+                }
+            }
+        }
+    }
+
+    // call this always means one of tasks is failed. count down to zero to 
finish entire task
+    public void countDownToZero(String errMsg) {
+        if (this.latch != null) {
+            latch.countDownToZero(new Status(TStatusCode.CANCELLED, errMsg));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("CalcDeleteBitmapTask download to zero. error msg: 
{}", errMsg);
+            }
+        }
+    }
+
+    public void countDownToZero(TStatusCode code, String errMsg) {
+        if (this.latch != null) {
+            latch.countDownToZero(new Status(code, errMsg));
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("CalcDeleteBitmapTask download to zero. error msg: 
{}", errMsg);
+            }
+        }
+    }
+
+    public void setLatch(MarkedCountDownLatch<Long, Long> latch) {
+        this.latch = latch;
+    }
+
+    public TCalcDeleteBitmapRequest toThrift() {
+        TCalcDeleteBitmapRequest calcDeleteBitmapRequest = new 
TCalcDeleteBitmapRequest(transactionId,
+                partitionInfos);
+        return calcDeleteBitmapRequest;
+    }
+
+    public long getTransactionId() {
+        return transactionId;
+    }
+
+    public List<TCalcDeleteBitmapPartitionInfo> 
getCalcDeleteBimapPartitionInfos() {
+        return partitionInfos;
+    }
+
+    public synchronized List<Long> getErrorTablets() {
+        return errorTablets;
+    }
+
+    public synchronized void addErrorTablets(List<Long> errorTablets) {
+        this.errorTablets.clear();
+        if (errorTablets == null) {
+            return;
+        }
+        this.errorTablets.addAll(errorTablets);
+    }
+
+    public void setIsFinished(boolean isFinished) {
+        this.isFinished = isFinished;
+    }
+
+    public boolean isFinished() {
+        return isFinished;
+    }
+}
diff --git 
a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy 
b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
index 1518def58c1..ba7214cc43c 100644
--- a/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
+++ b/regression-test/pipeline/cloud_p0/conf/regression-conf-custom.groovy
@@ -16,7 +16,7 @@
 // under the License.
 
 testGroups = "p0"
-testDirectories = 
"ddl_p0,database_p0,load,load_p0,query_p0,table_p0,account_p0,autobucket,bitmap_functions,bloom_filter_p0,cast_decimal_to_boolean,cast_double_to_decimal,compression_p0,connector_p0,correctness,correctness_p0,csv_header_p0,data_model_p0,database_p0,datatype_p0,delete_p0,demo_p0,empty_relation,export_p0,external_table_p0,fault_injection_p0,flink_connector_p0,insert_overwrite_p0,insert_p0,internal_schema_p0,javaudf_p0,job_p0,json_p0,jsonb_p0,meta_action_p0,metrics_p0,mtmv_
 [...]
+testDirectories = 
"ddl_p0,database_p0,load,load_p0,query_p0,table_p0,account_p0,autobucket,bitmap_functions,bloom_filter_p0,cast_decimal_to_boolean,cast_double_to_decimal,compression_p0,connector_p0,correctness,correctness_p0,csv_header_p0,data_model_p0,database_p0,datatype_p0,delete_p0,demo_p0,empty_relation,export_p0,external_table_p0,fault_injection_p0,flink_connector_p0,insert_overwrite_p0,insert_p0,internal_schema_p0,javaudf_p0,job_p0,json_p0,jsonb_p0,meta_action_p0,metrics_p0,mtmv_
 [...]
 //exclude groups and exclude suites is more prior than include groups and 
include suites.
-excludeSuites = 
"test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external"
-excludeDirectories = "workload_manager_p1,nereids_rules_p0/subquery"
+excludeSuites = 
"test_index_failure_injection,test_dump_image,test_profile,test_spark_load,test_refresh_mtmv,test_bitmap_filter,test_information_schema_external,test_primary_key_partial_update_parallel"
+excludeDirectories = 
"workload_manager_p1,nereids_rules_p0/subquery,unique_with_mow_p0/cluster_key,unique_with_mow_p0/ssb_unique_sql_zstd_cluster"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to