This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 31ddc47e231 branch-4.0: [improvement](recycler) Avoid single-point 
read/write during sequentially reading key #62476 (#63122)
31ddc47e231 is described below

commit 31ddc47e2312464b882a687f91593c81c3e60160
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri May 15 21:10:25 2026 -0700

    branch-4.0: [improvement](recycler) Avoid single-point read/write during 
sequentially reading key #62476 (#63122)
    
    Cherry-picked from #62476
    
    Co-authored-by: Yixuan Wang <[email protected]>
---
 cloud/src/recycler/recycler.cpp | 446 +++++++++++++++++++++++++++++-----------
 cloud/src/recycler/recycler.h   |   3 +-
 2 files changed, 328 insertions(+), 121 deletions(-)

diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 59f43596037..aab1daba813 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -36,6 +36,7 @@
 #include <initializer_list>
 #include <memory>
 #include <numeric>
+#include <optional>
 #include <random>
 #include <string>
 #include <string_view>
@@ -1838,90 +1839,243 @@ int 
InstanceRecycler::abort_job_for_related_rowset(const RowsetMetaCloudPB& rows
 }
 
 template <typename T>
-int InstanceRecycler::abort_txn_or_job_for_recycle(T& rowset_meta_pb) {
-    RowsetMetaCloudPB* rs_meta;
-    RecycleRowsetPB::Type rowset_type = RecycleRowsetPB::PREPARE;
+RowsetMetaCloudPB* mutable_rowset_meta(T& rowset_meta_pb) {
+    if constexpr (std::is_same_v<T, RecycleRowsetPB>) {
+        return rowset_meta_pb.mutable_rowset_meta();
+    } else {
+        return &rowset_meta_pb;
+    }
+}
 
+template <typename T>
+const RowsetMetaCloudPB& rowset_meta(const T& rowset_meta_pb) {
     if constexpr (std::is_same_v<T, RecycleRowsetPB>) {
-        // For keys that are not in the RecycleRowsetPB::PREPARE state
-        // we do not need to check the job or txn state
-        // because tmp_rowset_key already exists when this key is generated.
-        rowset_type = rowset_meta_pb.type();
-        rs_meta = rowset_meta_pb.mutable_rowset_meta();
+        return rowset_meta_pb.rowset_meta();
     } else {
-        rs_meta = &rowset_meta_pb;
+        return rowset_meta_pb;
     }
+}
+
+struct DeferredRecycleAbortTask {
+    enum class Type : uint8_t {
+        TXN,
+        JOB,
+    };
 
-    DCHECK(rs_meta != nullptr);
+    Type type = Type::TXN;
+    int64_t txn_id = 0;
+    int64_t tablet_id = 0;
+    int64_t start_version = 0;
+    int64_t end_version = 0;
+    std::string rowset_id;
+    std::string job_id;
+};
+
+struct DeferredRecyclePrepareDeleteTask {
+    std::string key;
+    std::string resource_id;
+    std::string rowset_id;
+    int64_t tablet_id = 0;
+};
 
-    // compaction/sc will generate recycle_rowset_key for each input rowset 
with load_id
-    // we need skip them because the related txn has been finished
-    // load_rowset1 load_rowset2 => pick for compaction => compact_rowset
-    // compact_rowset1 compact_rowset2 => pick for compaction/sc job => 
new_rowset
-    if (rowset_type == RecycleRowsetPB::PREPARE) {
-        if (rs_meta->has_load_id()) {
-            // load
-            return abort_txn_for_related_rowset(rs_meta->txn_id());
-        } else if (rs_meta->has_job_id()) {
-            // compaction / schema change
-            return abort_job_for_related_rowset(*rs_meta);
+template <typename T>
+std::optional<DeferredRecycleAbortTask> make_deferred_abort_task(const T& 
rowset_meta_pb) {
+    if constexpr (std::is_same_v<T, RecycleRowsetPB>) {
+        if (rowset_meta_pb.type() != RecycleRowsetPB::PREPARE) {
+            return std::nullopt;
         }
     }
 
-    return 0;
+    const auto& rs_meta = rowset_meta(rowset_meta_pb);
+    DeferredRecycleAbortTask task;
+    task.tablet_id = rs_meta.tablet_id();
+    task.start_version = rs_meta.start_version();
+    task.end_version = rs_meta.end_version();
+    if (rs_meta.has_load_id()) {
+        task.type = DeferredRecycleAbortTask::Type::TXN;
+        task.txn_id = rs_meta.txn_id();
+        return task;
+    }
+    if (rs_meta.has_job_id()) {
+        task.type = DeferredRecycleAbortTask::Type::JOB;
+        task.rowset_id = rs_meta.rowset_id_v2();
+        task.job_id = rs_meta.job_id();
+        return task;
+    }
+    return std::nullopt;
 }
 
 template <typename T>
-int mark_rowset_as_recycled(TxnKv* txn_kv, const std::string& instance_id, 
std::string_view key,
-                            T& rowset_meta_pb) {
-    RowsetMetaCloudPB* rs_meta;
+bool need_mark_rowset_as_recycled(const T& rowset_meta_pb) {
+    const auto& rs_meta = rowset_meta(rowset_meta_pb);
+    return !rs_meta.has_is_recycled() || !rs_meta.is_recycled();
+}
 
-    if constexpr (std::is_same_v<T, RecycleRowsetPB>) {
-        rs_meta = rowset_meta_pb.mutable_rowset_meta();
-    } else {
-        rs_meta = &rowset_meta_pb;
+template <typename T>
+int batch_mark_rowsets_as_recycled(TxnKv* txn_kv, const std::string& 
instance_id,
+                                   const std::vector<std::string>& keys) {
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG(WARNING) << "failed to create txn, instance_id=" << instance_id;
+        return -1;
     }
-
-    bool need_write_back = false;
-    if ((!rs_meta->has_is_recycled() || !rs_meta->is_recycled())) {
-        need_write_back = true;
-        rs_meta->set_is_recycled(true);
+    std::vector<std::optional<std::string>> values;
+    err = txn->batch_get(&values, keys);
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG(WARNING) << "failed to batch get rowset meta, instance_id=" << 
instance_id << ' '
+                     << "keys size=" << keys.size() << ' ' << "err=" << err;
+        return -1;
+    }
+    size_t total_keys = keys.size();
+    for (size_t i = 0; i < total_keys; i++) {
+        if (!values[i].has_value()) {
+            // has already been removed by commit_rowset
+            continue;
+        }
+        auto key = keys[i];
+        auto val = values[i].value();
+        T rowset_meta_pb;
+        if (!rowset_meta_pb.ParseFromString(val)) {
+            LOG(WARNING) << "failed to parse rowset meta, instance_id=" << 
instance_id
+                         << " key=" << hex(key);
+            return -1;
+        }
+        if (!need_mark_rowset_as_recycled(rowset_meta_pb)) {
+            continue;
+        }
+        mutable_rowset_meta(rowset_meta_pb)->set_is_recycled(true);
+        val.clear();
+        rowset_meta_pb.SerializeToString(&val);
+        txn->put(key, val);
+    }
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        LOG(WARNING) << "failed to commit txn, instance_id=" << instance_id;
+        return -1;
     }
 
-    if (need_write_back) {
+    return 0;
+}
+
+template <typename T>
+int collect_deferred_abort_tasks(TxnKv* txn_kv, const std::string& instance_id,
+                                 const std::vector<std::string>& keys,
+                                 std::vector<DeferredRecycleAbortTask>* 
abort_tasks,
+                                 bool skip_base_version) {
+    constexpr size_t kAbortCheckBatchSize = 256;
+    for (size_t offset = 0; offset < keys.size(); offset += 
kAbortCheckBatchSize) {
+        size_t limit = std::min(keys.size(), offset + kAbortCheckBatchSize);
         std::unique_ptr<Transaction> txn;
         TxnErrorCode err = txn_kv->create_txn(&txn);
         if (err != TxnErrorCode::TXN_OK) {
             LOG(WARNING) << "failed to create txn, instance_id=" << 
instance_id;
             return -1;
         }
-        // double check becase of new transaction
-        T rowset_meta;
-        std::string val;
-        err = txn->get(key, &val);
-        if (!rowset_meta.ParseFromString(val)) {
-            LOG(WARNING) << "failed to parse rs_meta, instance_id=" << 
instance_id;
-            return -1;
+        for (size_t idx = offset; idx < limit; ++idx) {
+            const std::string& key = keys[idx];
+            std::string val;
+            err = txn->get(key, &val);
+            if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                // has already been removed
+                continue;
+            }
+            if (err != TxnErrorCode::TXN_OK) {
+                LOG(WARNING) << "failed to get rowset meta, instance_id=" << 
instance_id
+                             << " key=" << hex(key);
+                return -1;
+            }
+            T rowset_meta_pb;
+            if (!rowset_meta_pb.ParseFromString(val)) {
+                LOG(WARNING) << "failed to parse rowset meta, instance_id=" << 
instance_id
+                             << " key=" << hex(key);
+                return -1;
+            }
+            if (skip_base_version && rowset_meta(rowset_meta_pb).end_version() 
== 1) {
+                continue;
+            }
+            if (auto abort_task = make_deferred_abort_task(rowset_meta_pb);
+                abort_task.has_value()) {
+                abort_tasks->emplace_back(std::move(*abort_task));
+            }
         }
-        if constexpr (std::is_same_v<T, RecycleRowsetPB>) {
-            rs_meta = rowset_meta.mutable_rowset_meta();
+    }
+    return 0;
+}
+
+template <typename T>
+int InstanceRecycler::batch_abort_txn_or_job_for_recycle(const 
std::vector<std::string>& keys,
+                                                         bool 
skip_base_version) {
+    std::vector<DeferredRecycleAbortTask> abort_tasks;
+    if (collect_deferred_abort_tasks<T>(txn_kv_.get(), instance_id_, keys, 
&abort_tasks,
+                                        skip_base_version) != 0) {
+        LOG(WARNING) << "failed to collect rowset abort tasks, instance_id=" 
<< instance_id_;
+        return -1;
+    }
+    for (const auto& abort_task : abort_tasks) {
+        LOG(INFO) << "begin to abort txn or job for related rowset, 
instance_id=" << instance_id_
+                  << " tablet_id=" << abort_task.tablet_id << " version=["
+                  << abort_task.start_version << '-' << abort_task.end_version 
<< "]";
+        int abort_ret = 0;
+        if (abort_task.type == DeferredRecycleAbortTask::Type::TXN) {
+            abort_ret = abort_txn_for_related_rowset(abort_task.txn_id);
         } else {
-            rs_meta = &rowset_meta;
+            RowsetMetaCloudPB rowset_meta;
+            rowset_meta.set_tablet_id(abort_task.tablet_id);
+            rowset_meta.set_rowset_id_v2(abort_task.rowset_id);
+            rowset_meta.set_job_id(abort_task.job_id);
+            abort_ret = abort_job_for_related_rowset(rowset_meta);
         }
-        if ((rs_meta->has_is_recycled() && rs_meta->is_recycled())) {
-            return 0;
+        if (abort_ret != 0) {
+            LOG(WARNING) << "failed to abort txn or job for related rowset, 
instance_id="
+                         << instance_id_ << " tablet_id=" << 
abort_task.tablet_id << " version=["
+                         << abort_task.start_version << '-' << 
abort_task.end_version << "]";
+            return abort_ret;
         }
-        rs_meta->set_is_recycled(true);
-        val.clear();
-        rowset_meta.SerializeToString(&val);
-        txn->put(key, val);
-        err = txn->commit();
+    }
+    return 0;
+}
+
+int collect_prepare_delete_tasks(TxnKv* txn_kv, const std::string& instance_id,
+                                 const std::vector<std::string>& keys,
+                                 
std::vector<DeferredRecyclePrepareDeleteTask>* delete_tasks) {
+    constexpr size_t kPrepareCheckBatchSize = 256;
+    for (size_t offset = 0; offset < keys.size(); offset += 
kPrepareCheckBatchSize) {
+        size_t limit = std::min(keys.size(), offset + kPrepareCheckBatchSize);
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv->create_txn(&txn);
         if (err != TxnErrorCode::TXN_OK) {
-            LOG(WARNING) << "failed to commit txn, instance_id=" << 
instance_id;
+            LOG(WARNING) << "failed to create txn, instance_id=" << 
instance_id;
             return -1;
         }
+        for (size_t idx = offset; idx < limit; ++idx) {
+            const std::string& key = keys[idx];
+            std::string val;
+            err = txn->get(key, &val);
+            if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                // has already been removed
+                continue;
+            }
+            if (err != TxnErrorCode::TXN_OK) {
+                LOG(WARNING) << "failed to get recycle rowset, instance_id=" 
<< instance_id
+                             << " key=" << hex(key);
+                return -1;
+            }
+            RecycleRowsetPB rowset;
+            if (!rowset.ParseFromString(val)) {
+                LOG(WARNING) << "failed to parse recycle rowset, instance_id=" 
<< instance_id
+                             << " key=" << hex(key);
+                return -1;
+            }
+            if (rowset.type() != RecycleRowsetPB::PREPARE) {
+                continue;
+            }
+            const auto& rs_meta = rowset.rowset_meta();
+            delete_tasks->push_back(
+                    {key, rs_meta.resource_id(), rs_meta.rowset_id_v2(), 
rs_meta.tablet_id()});
+        }
     }
-    return need_write_back ? 1 : 0;
+    return 0;
 }
 
 int InstanceRecycler::recycle_ref_rowsets(bool* has_unrecycled_rowsets) {
@@ -4728,6 +4882,9 @@ int InstanceRecycler::recycle_rowsets() {
     };
 
     std::vector<std::string> rowset_keys;
+    std::vector<std::string> rowset_keys_to_mark_recycled;
+    std::vector<std::string> rowset_keys_to_abort;
+    std::vector<std::string> prepare_rowset_keys_to_delete;
     // rowset_id -> rowset_meta
     // store rowset id and meta for statistics rs size when delete
     std::map<std::string, doris::RowsetMetaCloudPB> rowsets;
@@ -4854,38 +5011,26 @@ int InstanceRecycler::recycle_rowsets() {
 
         auto* rowset_meta = rowset.mutable_rowset_meta();
         if (config::enable_mark_delete_rowset_before_recycle) {
-            int mark_ret = mark_rowset_as_recycled(txn_kv_.get(), 
instance_id_, k, rowset);
-            if (mark_ret == -1) {
-                LOG(WARNING) << "failed to mark rowset as recycled, 
instance_id=" << instance_id_
-                             << " tablet_id=" << rowset_meta->tablet_id() << " 
version=["
-                             << rowset_meta->start_version() << '-' << 
rowset_meta->end_version()
-                             << "]";
-                return -1;
-            } else if (mark_ret == 1) {
-                LOG(INFO)
-                        << "rowset already marked as recycled, recycler will 
delete data and kv at "
-                           "next turn, instance_id="
-                        << instance_id_ << " tablet_id=" << 
rowset_meta->tablet_id() << " version=["
-                        << rowset_meta->start_version() << '-' << 
rowset_meta->end_version() << "]";
+            if (need_mark_rowset_as_recycled(rowset)) {
+                rowset_keys_to_mark_recycled.emplace_back(k);
+                LOG(INFO) << "rowset queued to mark as recycled, recycler will 
delete data and kv "
+                             "at next turn, instance_id="
+                          << instance_id_ << " tablet_id=" << 
rowset_meta->tablet_id()
+                          << " version=[" << rowset_meta->start_version() << 
'-'
+                          << rowset_meta->end_version() << "]";
                 return 0;
             }
         }
 
-        if (config::enable_abort_txn_and_job_for_delete_rowset_before_recycle) 
{
-            LOG(INFO) << "begin to abort txn or job for related rowset, 
instance_id="
-                      << instance_id_ << " tablet_id=" << 
rowset_meta->tablet_id() << " version=["
-                      << rowset_meta->start_version() << '-' << 
rowset_meta->end_version() << "]";
-
-            if (rowset_meta->end_version() != 1) {
-                int ret = abort_txn_or_job_for_recycle(rowset);
-
-                if (ret != 0) {
-                    LOG(WARNING) << "failed to abort txn or job for related 
rowset, instance_id="
-                                 << instance_id_ << " tablet_id=" << 
rowset.tablet_id()
-                                 << " version=[" << 
rowset_meta->start_version() << '-'
-                                 << rowset_meta->end_version() << "]";
-                    return ret;
-                }
+        if (config::enable_abort_txn_and_job_for_delete_rowset_before_recycle 
&&
+            rowset_meta->end_version() != 1) {
+            if (make_deferred_abort_task(rowset).has_value()) {
+                LOG(INFO) << "rowset queued to abort related txn or job after 
current scan batch, "
+                             "instance_id="
+                          << instance_id_ << " tablet_id=" << 
rowset_meta->tablet_id()
+                          << " version=[" << rowset_meta->start_version() << 
'-'
+                          << rowset_meta->end_version() << "]";
+                rowset_keys_to_abort.emplace_back(k);
             }
         }
 
@@ -4911,11 +5056,7 @@ int InstanceRecycler::recycle_rowsets() {
         if (rowset.type() == RecycleRowsetPB::PREPARE) {
             // unable to calculate file path, can only be deleted by rowset id 
prefix
             num_prepare += 1;
-            if (delete_rowset_data_by_prefix(std::string(k), 
rowset_meta->resource_id(),
-                                             rowset_meta->tablet_id(),
-                                             rowset_meta->rowset_id_v2()) != 
0) {
-                return -1;
-            }
+            prepare_rowset_keys_to_delete.emplace_back(k);
         } else {
             num_compacted += rowset.type() == RecycleRowsetPB::COMPACT;
             rowset_keys.emplace_back(k);
@@ -4929,13 +5070,63 @@ int InstanceRecycler::recycle_rowsets() {
 
     auto loop_done = [&]() -> int {
         std::vector<std::string> rowset_keys_to_delete;
+        std::vector<std::string> mark_keys_to_process;
+        std::vector<std::string> abort_keys_to_process;
+        std::vector<std::string> prepare_keys_to_process;
         // rowset_id -> rowset_meta
         // store rowset id and meta for statistics rs size when delete
         std::map<std::string, doris::RowsetMetaCloudPB> rowsets_to_delete;
         rowset_keys_to_delete.swap(rowset_keys);
+        mark_keys_to_process.swap(rowset_keys_to_mark_recycled);
+        abort_keys_to_process.swap(rowset_keys_to_abort);
+        prepare_keys_to_process.swap(prepare_rowset_keys_to_delete);
         rowsets_to_delete.swap(rowsets);
         worker_pool->submit([&, rowset_keys_to_delete = 
std::move(rowset_keys_to_delete),
-                             rowsets_to_delete = 
std::move(rowsets_to_delete)]() {
+                             rowsets_to_delete = std::move(rowsets_to_delete),
+                             prepare_keys_to_process = 
std::move(prepare_keys_to_process),
+                             mark_keys_to_process = 
std::move(mark_keys_to_process),
+                             abort_keys_to_process = 
std::move(abort_keys_to_process)]() mutable {
+            if (!mark_keys_to_process.empty() &&
+                batch_mark_rowsets_as_recycled<RecycleRowsetPB>(txn_kv_.get(), 
instance_id_,
+                                                                
mark_keys_to_process) != 0) {
+                LOG(WARNING) << "failed to batch mark recycle rowsets as 
recycled, instance_id="
+                             << instance_id_;
+                return;
+            }
+            if (!abort_keys_to_process.empty() &&
+                
batch_abort_txn_or_job_for_recycle<RecycleRowsetPB>(abort_keys_to_process, 
true) !=
+                        0) {
+                return;
+            }
+            std::vector<DeferredRecyclePrepareDeleteTask> prepare_delete_tasks;
+            if (!prepare_keys_to_process.empty() &&
+                collect_prepare_delete_tasks(txn_kv_.get(), instance_id_, 
prepare_keys_to_process,
+                                             &prepare_delete_tasks) != 0) {
+                LOG(WARNING) << "failed to collect prepare rowset delete 
tasks, instance_id="
+                             << instance_id_;
+                return;
+            }
+            if (!prepare_delete_tasks.empty()) {
+                std::vector<std::string> prepare_rowset_keys_to_delete;
+                
prepare_rowset_keys_to_delete.reserve(prepare_delete_tasks.size());
+                for (const auto& task : prepare_delete_tasks) {
+                    if (delete_rowset_data(task.resource_id, task.tablet_id, 
task.rowset_id) != 0) {
+                        LOG(WARNING) << "failed to delete rowset data, key=" 
<< hex(task.key);
+                        return;
+                    }
+                    if (delete_versioned_delete_bitmap_kvs(task.tablet_id, 
task.rowset_id) != 0) {
+                        return;
+                    }
+                    prepare_rowset_keys_to_delete.emplace_back(task.key);
+                }
+                if (txn_remove(txn_kv_.get(), prepare_rowset_keys_to_delete) 
!= 0) {
+                    LOG(WARNING) << "failed to delete recycle rowset kv, 
instance_id="
+                                 << instance_id_;
+                    return;
+                }
+                num_recycled.fetch_add(prepare_rowset_keys_to_delete.size(),
+                                       std::memory_order_relaxed);
+            }
             if (delete_rowset_data(rowsets_to_delete, 
RowsetRecyclingState::FORMAL_ROWSET,
                                    metrics_context) != 0) {
                 LOG(WARNING) << "failed to delete rowset data, instance_id=" 
<< instance_id_;
@@ -5558,6 +5749,8 @@ int InstanceRecycler::recycle_tmp_rowsets() {
 
     std::vector<std::string> tmp_rowset_keys;
     std::vector<std::string> tmp_rowset_ref_count_keys;
+    std::vector<std::string> tmp_rowset_keys_to_mark_recycled;
+    std::vector<std::string> tmp_rowset_keys_to_abort;
 
     // rowset_id -> rowset_meta
     // store tmp_rowset id and meta for statistics rs size when delete
@@ -5570,7 +5763,8 @@ int InstanceRecycler::recycle_tmp_rowsets() {
 
     auto handle_rowset_kv = [&num_scanned, &num_expired, &tmp_rowset_keys, 
&tmp_rowsets,
                              &expired_rowset_size, &total_rowset_key_size, 
&total_rowset_value_size,
-                             &earlest_ts, &tmp_rowset_ref_count_keys, this,
+                             &earlest_ts, &tmp_rowset_ref_count_keys,
+                             &tmp_rowset_keys_to_mark_recycled, 
&tmp_rowset_keys_to_abort, this,
                              &metrics_context](std::string_view k, 
std::string_view v) -> int {
         ++num_scanned;
         total_rowset_key_size += k.size();
@@ -5591,33 +5785,23 @@ int InstanceRecycler::recycle_tmp_rowsets() {
         }
 
         if (config::enable_mark_delete_rowset_before_recycle) {
-            int mark_ret = mark_rowset_as_recycled(txn_kv_.get(), 
instance_id_, k, rowset);
-            if (mark_ret == -1) {
-                LOG(WARNING) << "failed to mark rowset as recycled, 
instance_id=" << instance_id_
-                             << " tablet_id=" << rowset.tablet_id() << " 
version=["
-                             << rowset.start_version() << '-' << 
rowset.end_version() << "]";
-                return -1;
-            } else if (mark_ret == 1) {
-                LOG(INFO)
-                        << "rowset already marked as recycled, recycler will 
delete data and kv at "
-                           "next turn, instance_id="
-                        << instance_id_ << " tablet_id=" << rowset.tablet_id() 
<< " version=["
-                        << rowset.start_version() << '-' << 
rowset.end_version() << "]";
+            if (need_mark_rowset_as_recycled(rowset)) {
+                tmp_rowset_keys_to_mark_recycled.emplace_back(k);
+                LOG(INFO) << "rowset queued to mark as recycled, recycler will 
delete data and kv "
+                             "at next turn, instance_id="
+                          << instance_id_ << " tablet_id=" << 
rowset.tablet_id() << " version=["
+                          << rowset.start_version() << '-' << 
rowset.end_version() << "]";
                 return 0;
             }
         }
 
         if (config::enable_abort_txn_and_job_for_delete_rowset_before_recycle) 
{
-            LOG(INFO) << "begin to abort txn or job for related rowset, 
instance_id="
-                      << instance_id_ << " tablet_id=" << rowset.tablet_id() 
<< " version=["
-                      << rowset.start_version() << '-' << rowset.end_version() 
<< "]";
-
-            int ret = abort_txn_or_job_for_recycle(rowset);
-            if (ret != 0) {
-                LOG(WARNING) << "failed to abort txn or job for related 
rowset, instance_id="
-                             << instance_id_ << " tablet_id=" << 
rowset.tablet_id() << " version=["
-                             << rowset.start_version() << '-' << 
rowset.end_version() << "]";
-                return ret;
+            if (make_deferred_abort_task(rowset).has_value()) {
+                LOG(INFO) << "rowset queued to abort related txn or job after 
current scan batch, "
+                             "instance_id="
+                          << instance_id_ << " tablet_id=" << 
rowset.tablet_id() << " version=["
+                          << rowset.start_version() << '-' << 
rowset.end_version() << "]";
+                tmp_rowset_keys_to_abort.emplace_back(k);
             }
         }
 
@@ -5682,14 +5866,36 @@ int InstanceRecycler::recycle_tmp_rowsets() {
     };
 
     auto loop_done = [&]() -> int {
-        DORIS_CLOUD_DEFER {
-            tmp_rowset_keys.clear();
-            tmp_rowsets.clear();
-            tmp_rowset_ref_count_keys.clear();
-        };
-        worker_pool->submit([&, tmp_rowset_keys_to_delete = tmp_rowset_keys,
-                             tmp_rowsets_to_delete = tmp_rowsets,
-                             tmp_rowset_ref_count_keys_to_delete = 
tmp_rowset_ref_count_keys]() {
+        std::vector<std::string> tmp_rowset_keys_to_delete;
+        std::vector<std::string> tmp_rowset_ref_count_keys_to_delete;
+        std::vector<std::string> mark_keys_to_process;
+        std::vector<std::string> abort_keys_to_process;
+        std::map<std::string, doris::RowsetMetaCloudPB> tmp_rowsets_to_delete;
+        tmp_rowset_keys_to_delete.swap(tmp_rowset_keys);
+        tmp_rowsets_to_delete.swap(tmp_rowsets);
+        tmp_rowset_ref_count_keys_to_delete.swap(tmp_rowset_ref_count_keys);
+        mark_keys_to_process.swap(tmp_rowset_keys_to_mark_recycled);
+        abort_keys_to_process.swap(tmp_rowset_keys_to_abort);
+        worker_pool->submit([&, tmp_rowset_keys_to_delete = 
std::move(tmp_rowset_keys_to_delete),
+                             tmp_rowsets_to_delete = 
std::move(tmp_rowsets_to_delete),
+                             tmp_rowset_ref_count_keys_to_delete =
+                                     
std::move(tmp_rowset_ref_count_keys_to_delete),
+                             mark_keys_to_process = 
std::move(mark_keys_to_process),
+                             abort_keys_to_process = 
std::move(abort_keys_to_process)]() mutable {
+            if (!mark_keys_to_process.empty() &&
+                
batch_mark_rowsets_as_recycled<RowsetMetaCloudPB>(txn_kv_.get(), instance_id_,
+                                                                  
mark_keys_to_process) != 0) {
+                LOG(WARNING) << "failed to batch mark tmp rowsets as recycled, 
instance_id="
+                             << instance_id_;
+                return;
+            }
+            if (!abort_keys_to_process.empty() &&
+                
batch_abort_txn_or_job_for_recycle<RowsetMetaCloudPB>(abort_keys_to_process,
+                                                                      false) 
!= 0) {
+                LOG(WARNING) << "failed to batch abort txn or job for releated 
rowset, instance_id="
+                             << instance_id_;
+                return;
+            }
             if (delete_rowset_data(tmp_rowsets_to_delete, 
RowsetRecyclingState::TMP_ROWSET,
                                    metrics_context) != 0) {
                 LOG(WARNING) << "failed to delete tmp rowset data, 
instance_id=" << instance_id_;
@@ -5715,7 +5921,7 @@ int InstanceRecycler::recycle_tmp_rowsets() {
                 LOG(WARNING) << "failed to tmp rowset ref count kv, 
instance_id=" << instance_id_;
                 return;
             }
-            num_recycled += tmp_rowset_keys.size();
+            num_recycled += tmp_rowset_keys_to_delete.size();
             return;
         });
         return 0;
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 2c3b60042e6..bf04f2f5758 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -583,7 +583,8 @@ private:
     int abort_job_for_related_rowset(const RowsetMetaCloudPB& rowset_meta);
 
     template <typename T>
-    int abort_txn_or_job_for_recycle(T& rowset_meta_pb);
+    int batch_abort_txn_or_job_for_recycle(const std::vector<std::string>& 
keys,
+                                           bool skip_base_version);
 
 private:
     std::atomic_bool stopped_ {false};


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to