This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 7247a23ec33 branch-3.0: [fix](cloud) potential data race when retrying prepare/commit rowset for load #51129 (#51483) 7247a23ec33 is described below commit 7247a23ec3328a4c9825eaadbb07a26780277598 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Wed Jun 11 10:59:56 2025 +0800 branch-3.0: [fix](cloud) potential data race when retrying prepare/commit rowset for load #51129 (#51483) Cherry-picked from #51129 Co-authored-by: Xin Liao <liao...@selectdb.com> --- cloud/src/common/config.h | 3 + cloud/src/meta-service/meta_service.cpp | 97 ++++++++++++++++++++++++++++- cloud/src/meta-service/meta_service_txn.cpp | 1 + cloud/test/meta_service_test.cpp | 67 ++++++++++++++++++++ gensrc/proto/cloud.proto | 1 + 5 files changed, 168 insertions(+), 1 deletion(-) diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index 1e40c5cfb25..3bb791f71e9 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -219,6 +219,9 @@ CONF_String(kerberos_krb5_conf_path, "/etc/krb5.conf"); CONF_mBool(enable_distinguish_hdfs_path, "true"); +// If enabled, the txn status will be checked when preapre/commit rowset +CONF_mBool(enable_load_txn_status_check, "true"); + // Declare a selection strategy for those servers have many ips. // Note that there should at most one ip match this list. // this is a list in semicolon-delimited format, in CIDR notation, diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 45e64f475f6..d26e2fde1ec 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -991,6 +991,73 @@ static void fill_schema_from_dict(MetaServiceCode& code, std::string& msg, existed_rowset_meta->CopyFrom(metas.Get(0)); } +/** +* Check if the transaction status is as expected. +* If the transaction is not in the expected state, return false and set the error code and message. +* +* @param expect_status The expected transaction status. +* @param txn Pointer to the transaction object. +* @param instance_id The instance ID associated with the transaction. +* @param txn_id The transaction ID to check. +* @param code Reference to the error code to be set in case of failure. +* @param msg Reference to the error message to be set in case of failure. +* @return true if the transaction status matches the expected status, false otherwise. + */ +static bool check_transaction_status(TxnStatusPB expect_status, Transaction* txn, + const std::string& instance_id, int64_t txn_id, + MetaServiceCode& code, std::string& msg) { + // Get db id with txn id + std::string index_val; + const std::string index_key = txn_index_key({instance_id, txn_id}); + TxnErrorCode err = txn->get(index_key, &index_val); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + msg = fmt::format("failed to get db id, txn_id={} err={}", txn_id, err); + return false; + } + + TxnIndexPB index_pb; + if (!index_pb.ParseFromString(index_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = fmt::format("failed to parse txn_index_pb, txn_id={}", txn_id); + return false; + } + + DCHECK(index_pb.has_tablet_index() == true); + DCHECK(index_pb.tablet_index().has_db_id() == true); + if (!index_pb.has_tablet_index() || !index_pb.tablet_index().has_db_id()) { + LOG(WARNING) << fmt::format( + "txn_index_pb is malformed, tablet_index has no db_id, txn_id={}", txn_id); + code = MetaServiceCode::INVALID_ARGUMENT; + msg = fmt::format("has no db_id in TxnIndexPB, txn_id={}", txn_id); + return false; + } + auto db_id = index_pb.tablet_index().db_id(); + txn_id = index_pb.has_parent_txn_id() ? index_pb.parent_txn_id() : txn_id; + + const std::string info_key = txn_info_key({instance_id, db_id, txn_id}); + std::string info_val; + err = txn->get(info_key, &info_val); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + msg = fmt::format("failed to get txn, txn_id={}, err={}", txn_id, err); + return false; + } + TxnInfoPB txn_info; + if (!txn_info.ParseFromString(info_val)) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = fmt::format("failed to parse txn_info, db_id={} txn_id={}", db_id, txn_id); + return false; + } + if (txn_info.status() != expect_status) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = fmt::format("txn is not in {} state, txn_id={}, txn_status={}", expect_status, txn_id, + txn_info.status()); + return false; + } + return true; +} + /** * 1. Check and confirm tmp rowset kv does not exist * 2. Construct recycle rowset kv which contains object path @@ -1033,6 +1100,20 @@ void MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controll return; } + // Check if the prepare rowset request is invalid. + // If the transaction has been finished, it means this prepare rowset is a timeout retry request. + // In this case, do not write the recycle key again, otherwise it may cause data loss. + // If the rowset had load id, it means it is a load request, otherwise it is a + // compaction/sc request. + if (config::enable_load_txn_status_check && rowset_meta.has_load_id() && + !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn.get(), instance_id, + rowset_meta.txn_id(), code, msg)) { + LOG(WARNING) << "prepare rowset failed, txn_id=" << rowset_meta.txn_id() + << ", tablet_id=" << tablet_id << ", rowset_id=" << rowset_id + << ", rowset_state=" << rowset_meta.rowset_state() << ", msg=" << msg; + return; + } + // Check if commit key already exists. std::string val; err = txn->get(tmp_rs_key, &val); @@ -1156,6 +1237,20 @@ void MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle return; } + // Check if the commit rowset request is invalid. + // If the transaction has been finished, it means this commit rowset is a timeout retry request. + // In this case, do not write the recycle key again, otherwise it may cause data loss. + // If the rowset has load id, it means it is a load request, otherwise it is a + // compaction/sc request. + if (config::enable_load_txn_status_check && rowset_meta.has_load_id() && + !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn.get(), instance_id, + rowset_meta.txn_id(), code, msg)) { + LOG(WARNING) << "commit rowset failed, txn_id=" << rowset_meta.txn_id() + << ", tablet_id=" << tablet_id << ", rowset_id=" << rowset_id + << ", rowset_state=" << rowset_meta.rowset_state() << ", msg=" << msg; + return; + } + // Check if commit key already exists. std::string existed_commit_val; err = txn->get(tmp_rs_key, &existed_commit_val); @@ -2787,4 +2882,4 @@ void MetaServiceImpl::get_schema_dict(::google::protobuf::RpcController* control response->mutable_schema_dict()->Swap(&schema_dict); } -} // namespace doris::cloud +} // namespace doris::cloud \ No newline at end of file diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 21e7dbe1a0c..490c7bb616b 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -3190,6 +3190,7 @@ void MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* controlle std::string index_val; TxnIndexPB index_pb; index_pb.mutable_tablet_index()->set_db_id(db_id); + index_pb.set_parent_txn_id(txn_id); if (!index_pb.SerializeToString(&index_val)) { code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR; ss << "failed to serialize txn_index_pb " diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp index 0e8e543c494..bd593ce783c 100644 --- a/cloud/test/meta_service_test.cpp +++ b/cloud/test/meta_service_test.cpp @@ -8930,4 +8930,71 @@ TEST(MetaServiceTest, AddObjInfoWithRole) { SyncPoint::get_instance()->disable_processing(); SyncPoint::get_instance()->clear_all_call_backs(); } + +TEST(MetaServiceTest, StalePrepareRowset) { + auto meta_service = get_meta_service(); + + int64_t table_id = 1; + int64_t partition_id = 1; + int64_t tablet_id = 1; + int64_t db_id = 100201; + std::string label = "test_prepare_rowset"; + create_tablet(meta_service.get(), table_id, 1, partition_id, tablet_id); + + int64_t txn_id = 0; + ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label, table_id, txn_id)); + CreateRowsetResponse res; + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + rowset.mutable_load_id()->set_hi(123); + rowset.mutable_load_id()->set_lo(456); + prepare_rowset(meta_service.get(), rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label; + res.Clear(); + ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res)); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label; + + prepare_rowset(meta_service.get(), rowset, res); + ASSERT_TRUE(res.status().msg().find("rowset already exists") != std::string::npos) + << res.status().msg(); + ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED) << res.status().code(); + + commit_txn(meta_service.get(), db_id, txn_id, label); + prepare_rowset(meta_service.get(), rowset, res); + ASSERT_TRUE(res.status().msg().find("txn is not in") != std::string::npos) + << res.status().msg(); + ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << res.status().code(); +} + +TEST(MetaServiceTest, StaleCommitRowset) { + auto meta_service = get_meta_service(); + + int64_t table_id = 1; + int64_t partition_id = 1; + int64_t tablet_id = 1; + int64_t db_id = 100201; + std::string label = "test_prepare_rowset"; + create_tablet(meta_service.get(), table_id, 1, partition_id, tablet_id); + + int64_t txn_id = 0; + ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label, table_id, txn_id)); + CreateRowsetResponse res; + auto rowset = create_rowset(txn_id, tablet_id, partition_id); + rowset.mutable_load_id()->set_hi(123); + rowset.mutable_load_id()->set_lo(456); + prepare_rowset(meta_service.get(), rowset, res); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label; + res.Clear(); + ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res)); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label; + + ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res)); + ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label; + + commit_txn(meta_service.get(), db_id, txn_id, label); + ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res)); + ASSERT_TRUE(res.status().msg().find("txn is not in") != std::string::npos) + << res.status().msg(); + ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << res.status().code(); +} + } // namespace doris::cloud diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 1b6cdd9bd5a..fa1336a2e7a 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -385,6 +385,7 @@ message TxnLabelPB { // txn_id -> db_id message TxnIndexPB { optional TabletIndexPB tablet_index = 1; + optional int64 parent_txn_id = 2; } message TxnInfoPB { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org