This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new a236d1f04e1 branch-3.1: [Opt](cloud-mow) Do fast retry when commit
compaction job for mow tablet #52476 (#52843)
a236d1f04e1 is described below
commit a236d1f04e1750ac00877202f0ec61fcb7337d73
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jul 7 13:51:55 2025 +0800
branch-3.1: [Opt](cloud-mow) Do fast retry when commit compaction job for
mow tablet #52476 (#52843)
Cherry-picked from #52476
Co-authored-by: bobhan1 <[email protected]>
---
cloud/src/meta-service/meta_service_job.cpp | 141 +++++++++++++++-------------
1 file changed, 77 insertions(+), 64 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_job.cpp
b/cloud/src/meta-service/meta_service_job.cpp
index 00d4dde51ce..c6791aa4346 100644
--- a/cloud/src/meta-service/meta_service_job.cpp
+++ b/cloud/src/meta-service/meta_service_job.cpp
@@ -1492,58 +1492,73 @@ void
MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
return;
}
- bool need_commit = false;
- TxnErrorCode err = txn_kv_->create_txn(&txn);
- if (err != TxnErrorCode::TXN_OK) {
- code = cast_as<ErrCategory::CREATE>(err);
- msg = "failed to create txn";
- return;
- }
+ for (int retry = 0; retry <= 1; retry++) {
+ bool need_commit = false;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = "failed to create txn";
+ return;
+ }
- int64_t tablet_id = request->job().idx().tablet_id();
- if (tablet_id <= 0) {
- code = MetaServiceCode::INVALID_ARGUMENT;
- msg = "no valid tablet_id given";
- return;
- }
- auto& tablet_idx = const_cast<TabletIndexPB&>(request->job().idx());
- if (!tablet_idx.has_table_id() || !tablet_idx.has_index_id() ||
- !tablet_idx.has_partition_id()) {
- get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id,
tablet_idx);
- if (code != MetaServiceCode::OK) return;
- }
- // Check if tablet has been dropped
- if (is_dropped_tablet(txn.get(), instance_id, tablet_idx.index_id(),
- tablet_idx.partition_id())) {
- code = MetaServiceCode::TABLET_NOT_FOUND;
- msg = fmt::format("tablet {} has been dropped", tablet_id);
- return;
- }
+ int64_t tablet_id = request->job().idx().tablet_id();
+ if (tablet_id <= 0) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "no valid tablet_id given";
+ return;
+ }
+ auto& tablet_idx = const_cast<TabletIndexPB&>(request->job().idx());
+ if (!tablet_idx.has_table_id() || !tablet_idx.has_index_id() ||
+ !tablet_idx.has_partition_id()) {
+ get_tablet_idx(code, msg, txn.get(), instance_id, tablet_id,
tablet_idx);
+ if (code != MetaServiceCode::OK) return;
+ }
+ // Check if tablet has been dropped
+ if (is_dropped_tablet(txn.get(), instance_id, tablet_idx.index_id(),
+ tablet_idx.partition_id())) {
+ code = MetaServiceCode::TABLET_NOT_FOUND;
+ msg = fmt::format("tablet {} has been dropped", tablet_id);
+ return;
+ }
- // TODO(gavin): remove duplicated code with start_tablet_job()
- // Begin to process finish tablet job
- std::string job_key = job_tablet_key({instance_id, tablet_idx.table_id(),
tablet_idx.index_id(),
- tablet_idx.partition_id(),
tablet_id});
- std::string job_val;
- err = txn->get(job_key, &job_val);
- if (err != TxnErrorCode::TXN_OK) {
- SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," :
"internal error,")
- << " instance_id=" << instance_id << " tablet_id=" << tablet_id
- << " job=" << proto_to_json(request->job()) << " err=" << err;
- msg = ss.str();
- code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::INVALID_ARGUMENT
- :
cast_as<ErrCategory::READ>(err);
- return;
- }
- TabletJobInfoPB recorded_job;
- recorded_job.ParseFromString(job_val);
- VLOG_DEBUG << "get tablet job, tablet_id=" << tablet_id
- << " job=" << proto_to_json(recorded_job);
- FinishTabletJobRequest_Action action = request->action();
+ // TODO(gavin): remove duplicated code with start_tablet_job()
+ // Begin to process finish tablet job
+ std::string job_key =
+ job_tablet_key({instance_id, tablet_idx.table_id(),
tablet_idx.index_id(),
+ tablet_idx.partition_id(), tablet_id});
+ std::string job_val;
+ err = txn->get(job_key, &job_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ SS << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? "job not found," :
"internal error,")
+ << " instance_id=" << instance_id << " tablet_id=" << tablet_id
+ << " job=" << proto_to_json(request->job()) << " err=" << err;
+ msg = ss.str();
+ code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ?
MetaServiceCode::INVALID_ARGUMENT
+ :
cast_as<ErrCategory::READ>(err);
+ return;
+ }
+ TabletJobInfoPB recorded_job;
+ recorded_job.ParseFromString(job_val);
+ VLOG_DEBUG << "get tablet job, tablet_id=" << tablet_id
+ << " job=" << proto_to_json(recorded_job);
+ FinishTabletJobRequest_Action action = request->action();
+
+ std::string use_version =
+
delete_bitmap_lock_white_list_->get_delete_bitmap_lock_version(instance_id);
+ LOG(INFO) << "finish_tablet_job instance_id=" << instance_id
+ << " use_version=" << use_version;
+ if (!request->job().compaction().empty()) {
+ // Process compaction commit
+ process_compaction_job(code, msg, ss, txn, request, response,
recorded_job, instance_id,
+ job_key, need_commit, use_version);
+ } else if (request->job().has_schema_change()) {
+ // Process schema change commit
+ process_schema_change_job(code, msg, ss, txn, request, response,
recorded_job,
+ instance_id, job_key, need_commit,
use_version);
+ }
- DORIS_CLOUD_DEFER {
if (!need_commit) return;
- TxnErrorCode err = txn->commit();
+ err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
if (err == TxnErrorCode::TXN_CONFLICT) {
if (action == FinishTabletJobRequest::COMMIT) {
@@ -1553,28 +1568,26 @@ void
MetaServiceImpl::finish_tablet_job(::google::protobuf::RpcController* contr
} else if (action == FinishTabletJobRequest::ABORT) {
g_bvar_delete_bitmap_lock_txn_remove_conflict_by_compaction_abort_counter << 1;
}
+
+ if (retry == 0 && !request->job().compaction().empty() &&
+
request->job().compaction(0).has_delete_bitmap_lock_initiator()) {
+ // Do a fast retry for mow when commit compaction job.
+ // The only fdb txn conflict here is that during the
compaction job commit,
+ // a compaction lease RPC comes and finishes before the
commit,
+ // so we retry to commit the compaction job again.
+ response->Clear();
+ code = MetaServiceCode::OK;
+ msg.clear();
+ continue;
+ }
}
+
code = cast_as<ErrCategory::COMMIT>(err);
ss << "failed to commit job kv, err=" << err;
msg = ss.str();
return;
}
- };
- std::string use_version =
-
delete_bitmap_lock_white_list_->get_delete_bitmap_lock_version(instance_id);
- LOG(INFO) << "finish_tablet_job instance_id=" << instance_id << "
use_version=" << use_version;
- // Process compaction commit
- if (!request->job().compaction().empty()) {
- process_compaction_job(code, msg, ss, txn, request, response,
recorded_job, instance_id,
- job_key, need_commit, use_version);
- return;
- }
-
- // Process schema change commit
- if (request->job().has_schema_change()) {
- process_schema_change_job(code, msg, ss, txn, request, response,
recorded_job, instance_id,
- job_key, need_commit, use_version);
- return;
+ break;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]