This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 7247a23ec33 branch-3.0: [fix](cloud) potential data race when retrying 
prepare/commit rowset for load #51129 (#51483)
7247a23ec33 is described below

commit 7247a23ec3328a4c9825eaadbb07a26780277598
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 11 10:59:56 2025 +0800

    branch-3.0: [fix](cloud) potential data race when retrying prepare/commit 
rowset for load #51129 (#51483)
    
    Cherry-picked from #51129
    
    Co-authored-by: Xin Liao <liao...@selectdb.com>
---
 cloud/src/common/config.h                   |  3 +
 cloud/src/meta-service/meta_service.cpp     | 97 ++++++++++++++++++++++++++++-
 cloud/src/meta-service/meta_service_txn.cpp |  1 +
 cloud/test/meta_service_test.cpp            | 67 ++++++++++++++++++++
 gensrc/proto/cloud.proto                    |  1 +
 5 files changed, 168 insertions(+), 1 deletion(-)

diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index 1e40c5cfb25..3bb791f71e9 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -219,6 +219,9 @@ CONF_String(kerberos_krb5_conf_path, "/etc/krb5.conf");
 
 CONF_mBool(enable_distinguish_hdfs_path, "true");
 
+// If enabled, the txn status will be checked when preapre/commit rowset
+CONF_mBool(enable_load_txn_status_check, "true");
+
 // Declare a selection strategy for those servers have many ips.
 // Note that there should at most one ip match this list.
 // this is a list in semicolon-delimited format, in CIDR notation,
diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index 45e64f475f6..d26e2fde1ec 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -991,6 +991,73 @@ static void fill_schema_from_dict(MetaServiceCode& code, 
std::string& msg,
     existed_rowset_meta->CopyFrom(metas.Get(0));
 }
 
+/**
+* Check if the transaction status is as expected.
+* If the transaction is not in the expected state, return false and set the 
error code and message.
+*
+* @param expect_status The expected transaction status.
+* @param txn Pointer to the transaction object.
+* @param instance_id The instance ID associated with the transaction.
+* @param txn_id The transaction ID to check.
+* @param code Reference to the error code to be set in case of failure.
+* @param msg Reference to the error message to be set in case of failure.
+* @return true if the transaction status matches the expected status, false 
otherwise.  
+ */
+static bool check_transaction_status(TxnStatusPB expect_status, Transaction* 
txn,
+                                     const std::string& instance_id, int64_t 
txn_id,
+                                     MetaServiceCode& code, std::string& msg) {
+    // Get db id with txn id
+    std::string index_val;
+    const std::string index_key = txn_index_key({instance_id, txn_id});
+    TxnErrorCode err = txn->get(index_key, &index_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        msg = fmt::format("failed to get db id, txn_id={} err={}", txn_id, 
err);
+        return false;
+    }
+
+    TxnIndexPB index_pb;
+    if (!index_pb.ParseFromString(index_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        msg = fmt::format("failed to parse txn_index_pb, txn_id={}", txn_id);
+        return false;
+    }
+
+    DCHECK(index_pb.has_tablet_index() == true);
+    DCHECK(index_pb.tablet_index().has_db_id() == true);
+    if (!index_pb.has_tablet_index() || !index_pb.tablet_index().has_db_id()) {
+        LOG(WARNING) << fmt::format(
+                "txn_index_pb is malformed, tablet_index has no db_id, 
txn_id={}", txn_id);
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = fmt::format("has no db_id in TxnIndexPB, txn_id={}", txn_id);
+        return false;
+    }
+    auto db_id = index_pb.tablet_index().db_id();
+    txn_id = index_pb.has_parent_txn_id() ? index_pb.parent_txn_id() : txn_id;
+
+    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
+    std::string info_val;
+    err = txn->get(info_key, &info_val);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        msg = fmt::format("failed to get txn, txn_id={}, err={}", txn_id, err);
+        return false;
+    }
+    TxnInfoPB txn_info;
+    if (!txn_info.ParseFromString(info_val)) {
+        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+        msg = fmt::format("failed to parse txn_info, db_id={} txn_id={}", 
db_id, txn_id);
+        return false;
+    }
+    if (txn_info.status() != expect_status) {
+        code = MetaServiceCode::INVALID_ARGUMENT;
+        msg = fmt::format("txn is not in {} state, txn_id={}, txn_status={}", 
expect_status, txn_id,
+                          txn_info.status());
+        return false;
+    }
+    return true;
+}
+
 /**
  * 1. Check and confirm tmp rowset kv does not exist
  * 2. Construct recycle rowset kv which contains object path
@@ -1033,6 +1100,20 @@ void 
MetaServiceImpl::prepare_rowset(::google::protobuf::RpcController* controll
         return;
     }
 
+    // Check if the prepare rowset request is invalid.
+    // If the transaction has been finished, it means this prepare rowset is a 
timeout retry request.
+    // In this case, do not write the recycle key again, otherwise it may 
cause data loss.
+    // If the rowset had load id, it means it is a load request, otherwise it 
is a
+    // compaction/sc request.
+    if (config::enable_load_txn_status_check && rowset_meta.has_load_id() &&
+        !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn.get(), 
instance_id,
+                                  rowset_meta.txn_id(), code, msg)) {
+        LOG(WARNING) << "prepare rowset failed, txn_id=" << 
rowset_meta.txn_id()
+                     << ", tablet_id=" << tablet_id << ", rowset_id=" << 
rowset_id
+                     << ", rowset_state=" << rowset_meta.rowset_state() << ", 
msg=" << msg;
+        return;
+    }
+
     // Check if commit key already exists.
     std::string val;
     err = txn->get(tmp_rs_key, &val);
@@ -1156,6 +1237,20 @@ void 
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
         return;
     }
 
+    // Check if the commit rowset request is invalid.
+    // If the transaction has been finished, it means this commit rowset is a 
timeout retry request.
+    // In this case, do not write the recycle key again, otherwise it may 
cause data loss.
+    // If the rowset has load id, it means it is a load request, otherwise it 
is a
+    // compaction/sc request.
+    if (config::enable_load_txn_status_check && rowset_meta.has_load_id() &&
+        !check_transaction_status(TxnStatusPB::TXN_STATUS_PREPARED, txn.get(), 
instance_id,
+                                  rowset_meta.txn_id(), code, msg)) {
+        LOG(WARNING) << "commit rowset failed, txn_id=" << rowset_meta.txn_id()
+                     << ", tablet_id=" << tablet_id << ", rowset_id=" << 
rowset_id
+                     << ", rowset_state=" << rowset_meta.rowset_state() << ", 
msg=" << msg;
+        return;
+    }
+
     // Check if commit key already exists.
     std::string existed_commit_val;
     err = txn->get(tmp_rs_key, &existed_commit_val);
@@ -2787,4 +2882,4 @@ void 
MetaServiceImpl::get_schema_dict(::google::protobuf::RpcController* control
     response->mutable_schema_dict()->Swap(&schema_dict);
 }
 
-} // namespace doris::cloud
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 21e7dbe1a0c..490c7bb616b 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -3190,6 +3190,7 @@ void 
MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* controlle
     std::string index_val;
     TxnIndexPB index_pb;
     index_pb.mutable_tablet_index()->set_db_id(db_id);
+    index_pb.set_parent_txn_id(txn_id);
     if (!index_pb.SerializeToString(&index_val)) {
         code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
         ss << "failed to serialize txn_index_pb "
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 0e8e543c494..bd593ce783c 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -8930,4 +8930,71 @@ TEST(MetaServiceTest, AddObjInfoWithRole) {
     SyncPoint::get_instance()->disable_processing();
     SyncPoint::get_instance()->clear_all_call_backs();
 }
+
+TEST(MetaServiceTest, StalePrepareRowset) {
+    auto meta_service = get_meta_service();
+
+    int64_t table_id = 1;
+    int64_t partition_id = 1;
+    int64_t tablet_id = 1;
+    int64_t db_id = 100201;
+    std::string label = "test_prepare_rowset";
+    create_tablet(meta_service.get(), table_id, 1, partition_id, tablet_id);
+
+    int64_t txn_id = 0;
+    ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label, 
table_id, txn_id));
+    CreateRowsetResponse res;
+    auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+    rowset.mutable_load_id()->set_hi(123);
+    rowset.mutable_load_id()->set_lo(456);
+    prepare_rowset(meta_service.get(), rowset, res);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+    res.Clear();
+    ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+
+    prepare_rowset(meta_service.get(), rowset, res);
+    ASSERT_TRUE(res.status().msg().find("rowset already exists") != 
std::string::npos)
+            << res.status().msg();
+    ASSERT_EQ(res.status().code(), MetaServiceCode::ALREADY_EXISTED) << 
res.status().code();
+
+    commit_txn(meta_service.get(), db_id, txn_id, label);
+    prepare_rowset(meta_service.get(), rowset, res);
+    ASSERT_TRUE(res.status().msg().find("txn is not in") != std::string::npos)
+            << res.status().msg();
+    ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << 
res.status().code();
+}
+
+TEST(MetaServiceTest, StaleCommitRowset) {
+    auto meta_service = get_meta_service();
+
+    int64_t table_id = 1;
+    int64_t partition_id = 1;
+    int64_t tablet_id = 1;
+    int64_t db_id = 100201;
+    std::string label = "test_prepare_rowset";
+    create_tablet(meta_service.get(), table_id, 1, partition_id, tablet_id);
+
+    int64_t txn_id = 0;
+    ASSERT_NO_FATAL_FAILURE(begin_txn(meta_service.get(), db_id, label, 
table_id, txn_id));
+    CreateRowsetResponse res;
+    auto rowset = create_rowset(txn_id, tablet_id, partition_id);
+    rowset.mutable_load_id()->set_hi(123);
+    rowset.mutable_load_id()->set_lo(456);
+    prepare_rowset(meta_service.get(), rowset, res);
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+    res.Clear();
+    ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+
+    ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
+    ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << label;
+
+    commit_txn(meta_service.get(), db_id, txn_id, label);
+    ASSERT_NO_FATAL_FAILURE(commit_rowset(meta_service.get(), rowset, res));
+    ASSERT_TRUE(res.status().msg().find("txn is not in") != std::string::npos)
+            << res.status().msg();
+    ASSERT_EQ(res.status().code(), MetaServiceCode::INVALID_ARGUMENT) << 
res.status().code();
+}
+
 } // namespace doris::cloud
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 1b6cdd9bd5a..fa1336a2e7a 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -385,6 +385,7 @@ message TxnLabelPB {
 // txn_id -> db_id
 message TxnIndexPB {
     optional TabletIndexPB tablet_index = 1;
+    optional int64 parent_txn_id = 2;
 }
 
 message TxnInfoPB {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to