This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 31ddc47e231 branch-4.0: [improvement](recycler) Avoid single-point
read/write during sequentially reading key #62476 (#63122)
31ddc47e231 is described below
commit 31ddc47e2312464b882a687f91593c81c3e60160
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri May 15 21:10:25 2026 -0700
branch-4.0: [improvement](recycler) Avoid single-point read/write during
sequentially reading key #62476 (#63122)
Cherry-picked from #62476
Co-authored-by: Yixuan Wang <[email protected]>
---
cloud/src/recycler/recycler.cpp | 446 +++++++++++++++++++++++++++++-----------
cloud/src/recycler/recycler.h | 3 +-
2 files changed, 328 insertions(+), 121 deletions(-)
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index 59f43596037..aab1daba813 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -36,6 +36,7 @@
#include <initializer_list>
#include <memory>
#include <numeric>
+#include <optional>
#include <random>
#include <string>
#include <string_view>
@@ -1838,90 +1839,243 @@ int
InstanceRecycler::abort_job_for_related_rowset(const RowsetMetaCloudPB& rows
}
template <typename T>
-int InstanceRecycler::abort_txn_or_job_for_recycle(T& rowset_meta_pb) {
- RowsetMetaCloudPB* rs_meta;
- RecycleRowsetPB::Type rowset_type = RecycleRowsetPB::PREPARE;
+RowsetMetaCloudPB* mutable_rowset_meta(T& rowset_meta_pb) {
+ if constexpr (std::is_same_v<T, RecycleRowsetPB>) {
+ return rowset_meta_pb.mutable_rowset_meta();
+ } else {
+ return &rowset_meta_pb;
+ }
+}
+template <typename T>
+const RowsetMetaCloudPB& rowset_meta(const T& rowset_meta_pb) {
if constexpr (std::is_same_v<T, RecycleRowsetPB>) {
- // For keys that are not in the RecycleRowsetPB::PREPARE state
- // we do not need to check the job or txn state
- // because tmp_rowset_key already exists when this key is generated.
- rowset_type = rowset_meta_pb.type();
- rs_meta = rowset_meta_pb.mutable_rowset_meta();
+ return rowset_meta_pb.rowset_meta();
} else {
- rs_meta = &rowset_meta_pb;
+ return rowset_meta_pb;
}
+}
+
+struct DeferredRecycleAbortTask {
+ enum class Type : uint8_t {
+ TXN,
+ JOB,
+ };
- DCHECK(rs_meta != nullptr);
+ Type type = Type::TXN;
+ int64_t txn_id = 0;
+ int64_t tablet_id = 0;
+ int64_t start_version = 0;
+ int64_t end_version = 0;
+ std::string rowset_id;
+ std::string job_id;
+};
+
+struct DeferredRecyclePrepareDeleteTask {
+ std::string key;
+ std::string resource_id;
+ std::string rowset_id;
+ int64_t tablet_id = 0;
+};
- // compaction/sc will generate recycle_rowset_key for each input rowset
with load_id
- // we need skip them because the related txn has been finished
- // load_rowset1 load_rowset2 => pick for compaction => compact_rowset
- // compact_rowset1 compact_rowset2 => pick for compaction/sc job =>
new_rowset
- if (rowset_type == RecycleRowsetPB::PREPARE) {
- if (rs_meta->has_load_id()) {
- // load
- return abort_txn_for_related_rowset(rs_meta->txn_id());
- } else if (rs_meta->has_job_id()) {
- // compaction / schema change
- return abort_job_for_related_rowset(*rs_meta);
+template <typename T>
+std::optional<DeferredRecycleAbortTask> make_deferred_abort_task(const T&
rowset_meta_pb) {
+ if constexpr (std::is_same_v<T, RecycleRowsetPB>) {
+ if (rowset_meta_pb.type() != RecycleRowsetPB::PREPARE) {
+ return std::nullopt;
}
}
- return 0;
+ const auto& rs_meta = rowset_meta(rowset_meta_pb);
+ DeferredRecycleAbortTask task;
+ task.tablet_id = rs_meta.tablet_id();
+ task.start_version = rs_meta.start_version();
+ task.end_version = rs_meta.end_version();
+ if (rs_meta.has_load_id()) {
+ task.type = DeferredRecycleAbortTask::Type::TXN;
+ task.txn_id = rs_meta.txn_id();
+ return task;
+ }
+ if (rs_meta.has_job_id()) {
+ task.type = DeferredRecycleAbortTask::Type::JOB;
+ task.rowset_id = rs_meta.rowset_id_v2();
+ task.job_id = rs_meta.job_id();
+ return task;
+ }
+ return std::nullopt;
}
template <typename T>
-int mark_rowset_as_recycled(TxnKv* txn_kv, const std::string& instance_id,
std::string_view key,
- T& rowset_meta_pb) {
- RowsetMetaCloudPB* rs_meta;
+bool need_mark_rowset_as_recycled(const T& rowset_meta_pb) {
+ const auto& rs_meta = rowset_meta(rowset_meta_pb);
+ return !rs_meta.has_is_recycled() || !rs_meta.is_recycled();
+}
- if constexpr (std::is_same_v<T, RecycleRowsetPB>) {
- rs_meta = rowset_meta_pb.mutable_rowset_meta();
- } else {
- rs_meta = &rowset_meta_pb;
+template <typename T>
+int batch_mark_rowsets_as_recycled(TxnKv* txn_kv, const std::string&
instance_id,
+ const std::vector<std::string>& keys) {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to create txn, instance_id=" << instance_id;
+ return -1;
}
-
- bool need_write_back = false;
- if ((!rs_meta->has_is_recycled() || !rs_meta->is_recycled())) {
- need_write_back = true;
- rs_meta->set_is_recycled(true);
+ std::vector<std::optional<std::string>> values;
+ err = txn->batch_get(&values, keys);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to batch get rowset meta, instance_id=" <<
instance_id << ' '
+ << "keys size=" << keys.size() << ' ' << "err=" << err;
+ return -1;
+ }
+ size_t total_keys = keys.size();
+ for (size_t i = 0; i < total_keys; i++) {
+ if (!values[i].has_value()) {
+ // has already been removed by commit_rowset
+ continue;
+ }
+ auto key = keys[i];
+ auto val = values[i].value();
+ T rowset_meta_pb;
+ if (!rowset_meta_pb.ParseFromString(val)) {
+ LOG(WARNING) << "failed to parse rowset meta, instance_id=" <<
instance_id
+ << " key=" << hex(key);
+ return -1;
+ }
+ if (!need_mark_rowset_as_recycled(rowset_meta_pb)) {
+ continue;
+ }
+ mutable_rowset_meta(rowset_meta_pb)->set_is_recycled(true);
+ val.clear();
+ rowset_meta_pb.SerializeToString(&val);
+ txn->put(key, val);
+ }
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to commit txn, instance_id=" << instance_id;
+ return -1;
}
- if (need_write_back) {
+ return 0;
+}
+
+template <typename T>
+int collect_deferred_abort_tasks(TxnKv* txn_kv, const std::string& instance_id,
+ const std::vector<std::string>& keys,
+ std::vector<DeferredRecycleAbortTask>*
abort_tasks,
+ bool skip_base_version) {
+ constexpr size_t kAbortCheckBatchSize = 256;
+ for (size_t offset = 0; offset < keys.size(); offset +=
kAbortCheckBatchSize) {
+ size_t limit = std::min(keys.size(), offset + kAbortCheckBatchSize);
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
LOG(WARNING) << "failed to create txn, instance_id=" <<
instance_id;
return -1;
}
- // double check becase of new transaction
- T rowset_meta;
- std::string val;
- err = txn->get(key, &val);
- if (!rowset_meta.ParseFromString(val)) {
- LOG(WARNING) << "failed to parse rs_meta, instance_id=" <<
instance_id;
- return -1;
+ for (size_t idx = offset; idx < limit; ++idx) {
+ const std::string& key = keys[idx];
+ std::string val;
+ err = txn->get(key, &val);
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ // has already been removed
+ continue;
+ }
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to get rowset meta, instance_id=" <<
instance_id
+ << " key=" << hex(key);
+ return -1;
+ }
+ T rowset_meta_pb;
+ if (!rowset_meta_pb.ParseFromString(val)) {
+ LOG(WARNING) << "failed to parse rowset meta, instance_id=" <<
instance_id
+ << " key=" << hex(key);
+ return -1;
+ }
+ if (skip_base_version && rowset_meta(rowset_meta_pb).end_version()
== 1) {
+ continue;
+ }
+ if (auto abort_task = make_deferred_abort_task(rowset_meta_pb);
+ abort_task.has_value()) {
+ abort_tasks->emplace_back(std::move(*abort_task));
+ }
}
- if constexpr (std::is_same_v<T, RecycleRowsetPB>) {
- rs_meta = rowset_meta.mutable_rowset_meta();
+ }
+ return 0;
+}
+
+template <typename T>
+int InstanceRecycler::batch_abort_txn_or_job_for_recycle(const
std::vector<std::string>& keys,
+ bool
skip_base_version) {
+ std::vector<DeferredRecycleAbortTask> abort_tasks;
+ if (collect_deferred_abort_tasks<T>(txn_kv_.get(), instance_id_, keys,
&abort_tasks,
+ skip_base_version) != 0) {
+ LOG(WARNING) << "failed to collect rowset abort tasks, instance_id="
<< instance_id_;
+ return -1;
+ }
+ for (const auto& abort_task : abort_tasks) {
+ LOG(INFO) << "begin to abort txn or job for related rowset,
instance_id=" << instance_id_
+ << " tablet_id=" << abort_task.tablet_id << " version=["
+ << abort_task.start_version << '-' << abort_task.end_version
<< "]";
+ int abort_ret = 0;
+ if (abort_task.type == DeferredRecycleAbortTask::Type::TXN) {
+ abort_ret = abort_txn_for_related_rowset(abort_task.txn_id);
} else {
- rs_meta = &rowset_meta;
+ RowsetMetaCloudPB rowset_meta;
+ rowset_meta.set_tablet_id(abort_task.tablet_id);
+ rowset_meta.set_rowset_id_v2(abort_task.rowset_id);
+ rowset_meta.set_job_id(abort_task.job_id);
+ abort_ret = abort_job_for_related_rowset(rowset_meta);
}
- if ((rs_meta->has_is_recycled() && rs_meta->is_recycled())) {
- return 0;
+ if (abort_ret != 0) {
+ LOG(WARNING) << "failed to abort txn or job for related rowset,
instance_id="
+ << instance_id_ << " tablet_id=" <<
abort_task.tablet_id << " version=["
+ << abort_task.start_version << '-' <<
abort_task.end_version << "]";
+ return abort_ret;
}
- rs_meta->set_is_recycled(true);
- val.clear();
- rowset_meta.SerializeToString(&val);
- txn->put(key, val);
- err = txn->commit();
+ }
+ return 0;
+}
+
+int collect_prepare_delete_tasks(TxnKv* txn_kv, const std::string& instance_id,
+ const std::vector<std::string>& keys,
+
std::vector<DeferredRecyclePrepareDeleteTask>* delete_tasks) {
+ constexpr size_t kPrepareCheckBatchSize = 256;
+ for (size_t offset = 0; offset < keys.size(); offset +=
kPrepareCheckBatchSize) {
+ size_t limit = std::min(keys.size(), offset + kPrepareCheckBatchSize);
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
- LOG(WARNING) << "failed to commit txn, instance_id=" <<
instance_id;
+ LOG(WARNING) << "failed to create txn, instance_id=" <<
instance_id;
return -1;
}
+ for (size_t idx = offset; idx < limit; ++idx) {
+ const std::string& key = keys[idx];
+ std::string val;
+ err = txn->get(key, &val);
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ // has already been removed
+ continue;
+ }
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to get recycle rowset, instance_id="
<< instance_id
+ << " key=" << hex(key);
+ return -1;
+ }
+ RecycleRowsetPB rowset;
+ if (!rowset.ParseFromString(val)) {
+ LOG(WARNING) << "failed to parse recycle rowset, instance_id="
<< instance_id
+ << " key=" << hex(key);
+ return -1;
+ }
+ if (rowset.type() != RecycleRowsetPB::PREPARE) {
+ continue;
+ }
+ const auto& rs_meta = rowset.rowset_meta();
+ delete_tasks->push_back(
+ {key, rs_meta.resource_id(), rs_meta.rowset_id_v2(),
rs_meta.tablet_id()});
+ }
}
- return need_write_back ? 1 : 0;
+ return 0;
}
int InstanceRecycler::recycle_ref_rowsets(bool* has_unrecycled_rowsets) {
@@ -4728,6 +4882,9 @@ int InstanceRecycler::recycle_rowsets() {
};
std::vector<std::string> rowset_keys;
+ std::vector<std::string> rowset_keys_to_mark_recycled;
+ std::vector<std::string> rowset_keys_to_abort;
+ std::vector<std::string> prepare_rowset_keys_to_delete;
// rowset_id -> rowset_meta
// store rowset id and meta for statistics rs size when delete
std::map<std::string, doris::RowsetMetaCloudPB> rowsets;
@@ -4854,38 +5011,26 @@ int InstanceRecycler::recycle_rowsets() {
auto* rowset_meta = rowset.mutable_rowset_meta();
if (config::enable_mark_delete_rowset_before_recycle) {
- int mark_ret = mark_rowset_as_recycled(txn_kv_.get(),
instance_id_, k, rowset);
- if (mark_ret == -1) {
- LOG(WARNING) << "failed to mark rowset as recycled,
instance_id=" << instance_id_
- << " tablet_id=" << rowset_meta->tablet_id() << "
version=["
- << rowset_meta->start_version() << '-' <<
rowset_meta->end_version()
- << "]";
- return -1;
- } else if (mark_ret == 1) {
- LOG(INFO)
- << "rowset already marked as recycled, recycler will
delete data and kv at "
- "next turn, instance_id="
- << instance_id_ << " tablet_id=" <<
rowset_meta->tablet_id() << " version=["
- << rowset_meta->start_version() << '-' <<
rowset_meta->end_version() << "]";
+ if (need_mark_rowset_as_recycled(rowset)) {
+ rowset_keys_to_mark_recycled.emplace_back(k);
+ LOG(INFO) << "rowset queued to mark as recycled, recycler will
delete data and kv "
+ "at next turn, instance_id="
+ << instance_id_ << " tablet_id=" <<
rowset_meta->tablet_id()
+ << " version=[" << rowset_meta->start_version() <<
'-'
+ << rowset_meta->end_version() << "]";
return 0;
}
}
- if (config::enable_abort_txn_and_job_for_delete_rowset_before_recycle)
{
- LOG(INFO) << "begin to abort txn or job for related rowset,
instance_id="
- << instance_id_ << " tablet_id=" <<
rowset_meta->tablet_id() << " version=["
- << rowset_meta->start_version() << '-' <<
rowset_meta->end_version() << "]";
-
- if (rowset_meta->end_version() != 1) {
- int ret = abort_txn_or_job_for_recycle(rowset);
-
- if (ret != 0) {
- LOG(WARNING) << "failed to abort txn or job for related
rowset, instance_id="
- << instance_id_ << " tablet_id=" <<
rowset.tablet_id()
- << " version=[" <<
rowset_meta->start_version() << '-'
- << rowset_meta->end_version() << "]";
- return ret;
- }
+ if (config::enable_abort_txn_and_job_for_delete_rowset_before_recycle
&&
+ rowset_meta->end_version() != 1) {
+ if (make_deferred_abort_task(rowset).has_value()) {
+ LOG(INFO) << "rowset queued to abort related txn or job after
current scan batch, "
+ "instance_id="
+ << instance_id_ << " tablet_id=" <<
rowset_meta->tablet_id()
+ << " version=[" << rowset_meta->start_version() <<
'-'
+ << rowset_meta->end_version() << "]";
+ rowset_keys_to_abort.emplace_back(k);
}
}
@@ -4911,11 +5056,7 @@ int InstanceRecycler::recycle_rowsets() {
if (rowset.type() == RecycleRowsetPB::PREPARE) {
// unable to calculate file path, can only be deleted by rowset id
prefix
num_prepare += 1;
- if (delete_rowset_data_by_prefix(std::string(k),
rowset_meta->resource_id(),
- rowset_meta->tablet_id(),
- rowset_meta->rowset_id_v2()) !=
0) {
- return -1;
- }
+ prepare_rowset_keys_to_delete.emplace_back(k);
} else {
num_compacted += rowset.type() == RecycleRowsetPB::COMPACT;
rowset_keys.emplace_back(k);
@@ -4929,13 +5070,63 @@ int InstanceRecycler::recycle_rowsets() {
auto loop_done = [&]() -> int {
std::vector<std::string> rowset_keys_to_delete;
+ std::vector<std::string> mark_keys_to_process;
+ std::vector<std::string> abort_keys_to_process;
+ std::vector<std::string> prepare_keys_to_process;
// rowset_id -> rowset_meta
// store rowset id and meta for statistics rs size when delete
std::map<std::string, doris::RowsetMetaCloudPB> rowsets_to_delete;
rowset_keys_to_delete.swap(rowset_keys);
+ mark_keys_to_process.swap(rowset_keys_to_mark_recycled);
+ abort_keys_to_process.swap(rowset_keys_to_abort);
+ prepare_keys_to_process.swap(prepare_rowset_keys_to_delete);
rowsets_to_delete.swap(rowsets);
worker_pool->submit([&, rowset_keys_to_delete =
std::move(rowset_keys_to_delete),
- rowsets_to_delete =
std::move(rowsets_to_delete)]() {
+ rowsets_to_delete = std::move(rowsets_to_delete),
+ prepare_keys_to_process =
std::move(prepare_keys_to_process),
+ mark_keys_to_process =
std::move(mark_keys_to_process),
+ abort_keys_to_process =
std::move(abort_keys_to_process)]() mutable {
+ if (!mark_keys_to_process.empty() &&
+ batch_mark_rowsets_as_recycled<RecycleRowsetPB>(txn_kv_.get(),
instance_id_,
+
mark_keys_to_process) != 0) {
+ LOG(WARNING) << "failed to batch mark recycle rowsets as
recycled, instance_id="
+ << instance_id_;
+ return;
+ }
+ if (!abort_keys_to_process.empty() &&
+
batch_abort_txn_or_job_for_recycle<RecycleRowsetPB>(abort_keys_to_process,
true) !=
+ 0) {
+ return;
+ }
+ std::vector<DeferredRecyclePrepareDeleteTask> prepare_delete_tasks;
+ if (!prepare_keys_to_process.empty() &&
+ collect_prepare_delete_tasks(txn_kv_.get(), instance_id_,
prepare_keys_to_process,
+ &prepare_delete_tasks) != 0) {
+ LOG(WARNING) << "failed to collect prepare rowset delete
tasks, instance_id="
+ << instance_id_;
+ return;
+ }
+ if (!prepare_delete_tasks.empty()) {
+ std::vector<std::string> prepare_rowset_keys_to_delete;
+
prepare_rowset_keys_to_delete.reserve(prepare_delete_tasks.size());
+ for (const auto& task : prepare_delete_tasks) {
+ if (delete_rowset_data(task.resource_id, task.tablet_id,
task.rowset_id) != 0) {
+ LOG(WARNING) << "failed to delete rowset data, key="
<< hex(task.key);
+ return;
+ }
+ if (delete_versioned_delete_bitmap_kvs(task.tablet_id,
task.rowset_id) != 0) {
+ return;
+ }
+ prepare_rowset_keys_to_delete.emplace_back(task.key);
+ }
+ if (txn_remove(txn_kv_.get(), prepare_rowset_keys_to_delete)
!= 0) {
+ LOG(WARNING) << "failed to delete recycle rowset kv,
instance_id="
+ << instance_id_;
+ return;
+ }
+ num_recycled.fetch_add(prepare_rowset_keys_to_delete.size(),
+ std::memory_order_relaxed);
+ }
if (delete_rowset_data(rowsets_to_delete,
RowsetRecyclingState::FORMAL_ROWSET,
metrics_context) != 0) {
LOG(WARNING) << "failed to delete rowset data, instance_id="
<< instance_id_;
@@ -5558,6 +5749,8 @@ int InstanceRecycler::recycle_tmp_rowsets() {
std::vector<std::string> tmp_rowset_keys;
std::vector<std::string> tmp_rowset_ref_count_keys;
+ std::vector<std::string> tmp_rowset_keys_to_mark_recycled;
+ std::vector<std::string> tmp_rowset_keys_to_abort;
// rowset_id -> rowset_meta
// store tmp_rowset id and meta for statistics rs size when delete
@@ -5570,7 +5763,8 @@ int InstanceRecycler::recycle_tmp_rowsets() {
auto handle_rowset_kv = [&num_scanned, &num_expired, &tmp_rowset_keys,
&tmp_rowsets,
&expired_rowset_size, &total_rowset_key_size,
&total_rowset_value_size,
- &earlest_ts, &tmp_rowset_ref_count_keys, this,
+ &earlest_ts, &tmp_rowset_ref_count_keys,
+ &tmp_rowset_keys_to_mark_recycled,
&tmp_rowset_keys_to_abort, this,
&metrics_context](std::string_view k,
std::string_view v) -> int {
++num_scanned;
total_rowset_key_size += k.size();
@@ -5591,33 +5785,23 @@ int InstanceRecycler::recycle_tmp_rowsets() {
}
if (config::enable_mark_delete_rowset_before_recycle) {
- int mark_ret = mark_rowset_as_recycled(txn_kv_.get(),
instance_id_, k, rowset);
- if (mark_ret == -1) {
- LOG(WARNING) << "failed to mark rowset as recycled,
instance_id=" << instance_id_
- << " tablet_id=" << rowset.tablet_id() << "
version=["
- << rowset.start_version() << '-' <<
rowset.end_version() << "]";
- return -1;
- } else if (mark_ret == 1) {
- LOG(INFO)
- << "rowset already marked as recycled, recycler will
delete data and kv at "
- "next turn, instance_id="
- << instance_id_ << " tablet_id=" << rowset.tablet_id()
<< " version=["
- << rowset.start_version() << '-' <<
rowset.end_version() << "]";
+ if (need_mark_rowset_as_recycled(rowset)) {
+ tmp_rowset_keys_to_mark_recycled.emplace_back(k);
+ LOG(INFO) << "rowset queued to mark as recycled, recycler will
delete data and kv "
+ "at next turn, instance_id="
+ << instance_id_ << " tablet_id=" <<
rowset.tablet_id() << " version=["
+ << rowset.start_version() << '-' <<
rowset.end_version() << "]";
return 0;
}
}
if (config::enable_abort_txn_and_job_for_delete_rowset_before_recycle)
{
- LOG(INFO) << "begin to abort txn or job for related rowset,
instance_id="
- << instance_id_ << " tablet_id=" << rowset.tablet_id()
<< " version=["
- << rowset.start_version() << '-' << rowset.end_version()
<< "]";
-
- int ret = abort_txn_or_job_for_recycle(rowset);
- if (ret != 0) {
- LOG(WARNING) << "failed to abort txn or job for related
rowset, instance_id="
- << instance_id_ << " tablet_id=" <<
rowset.tablet_id() << " version=["
- << rowset.start_version() << '-' <<
rowset.end_version() << "]";
- return ret;
+ if (make_deferred_abort_task(rowset).has_value()) {
+ LOG(INFO) << "rowset queued to abort related txn or job after
current scan batch, "
+ "instance_id="
+ << instance_id_ << " tablet_id=" <<
rowset.tablet_id() << " version=["
+ << rowset.start_version() << '-' <<
rowset.end_version() << "]";
+ tmp_rowset_keys_to_abort.emplace_back(k);
}
}
@@ -5682,14 +5866,36 @@ int InstanceRecycler::recycle_tmp_rowsets() {
};
auto loop_done = [&]() -> int {
- DORIS_CLOUD_DEFER {
- tmp_rowset_keys.clear();
- tmp_rowsets.clear();
- tmp_rowset_ref_count_keys.clear();
- };
- worker_pool->submit([&, tmp_rowset_keys_to_delete = tmp_rowset_keys,
- tmp_rowsets_to_delete = tmp_rowsets,
- tmp_rowset_ref_count_keys_to_delete =
tmp_rowset_ref_count_keys]() {
+ std::vector<std::string> tmp_rowset_keys_to_delete;
+ std::vector<std::string> tmp_rowset_ref_count_keys_to_delete;
+ std::vector<std::string> mark_keys_to_process;
+ std::vector<std::string> abort_keys_to_process;
+ std::map<std::string, doris::RowsetMetaCloudPB> tmp_rowsets_to_delete;
+ tmp_rowset_keys_to_delete.swap(tmp_rowset_keys);
+ tmp_rowsets_to_delete.swap(tmp_rowsets);
+ tmp_rowset_ref_count_keys_to_delete.swap(tmp_rowset_ref_count_keys);
+ mark_keys_to_process.swap(tmp_rowset_keys_to_mark_recycled);
+ abort_keys_to_process.swap(tmp_rowset_keys_to_abort);
+ worker_pool->submit([&, tmp_rowset_keys_to_delete =
std::move(tmp_rowset_keys_to_delete),
+ tmp_rowsets_to_delete =
std::move(tmp_rowsets_to_delete),
+ tmp_rowset_ref_count_keys_to_delete =
+
std::move(tmp_rowset_ref_count_keys_to_delete),
+ mark_keys_to_process =
std::move(mark_keys_to_process),
+ abort_keys_to_process =
std::move(abort_keys_to_process)]() mutable {
+ if (!mark_keys_to_process.empty() &&
+
batch_mark_rowsets_as_recycled<RowsetMetaCloudPB>(txn_kv_.get(), instance_id_,
+
mark_keys_to_process) != 0) {
+ LOG(WARNING) << "failed to batch mark tmp rowsets as recycled,
instance_id="
+ << instance_id_;
+ return;
+ }
+ if (!abort_keys_to_process.empty() &&
+
batch_abort_txn_or_job_for_recycle<RowsetMetaCloudPB>(abort_keys_to_process,
+ false)
!= 0) {
+ LOG(WARNING) << "failed to batch abort txn or job for releated
rowset, instance_id="
+ << instance_id_;
+ return;
+ }
if (delete_rowset_data(tmp_rowsets_to_delete,
RowsetRecyclingState::TMP_ROWSET,
metrics_context) != 0) {
LOG(WARNING) << "failed to delete tmp rowset data,
instance_id=" << instance_id_;
@@ -5715,7 +5921,7 @@ int InstanceRecycler::recycle_tmp_rowsets() {
LOG(WARNING) << "failed to tmp rowset ref count kv,
instance_id=" << instance_id_;
return;
}
- num_recycled += tmp_rowset_keys.size();
+ num_recycled += tmp_rowset_keys_to_delete.size();
return;
});
return 0;
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 2c3b60042e6..bf04f2f5758 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -583,7 +583,8 @@ private:
int abort_job_for_related_rowset(const RowsetMetaCloudPB& rowset_meta);
template <typename T>
- int abort_txn_or_job_for_recycle(T& rowset_meta_pb);
+ int batch_abort_txn_or_job_for_recycle(const std::vector<std::string>&
keys,
+ bool skip_base_version);
private:
std::atomic_bool stopped_ {false};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]