This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new e82ed7859a2 branch-4.1: [fix](cloud) Persist update time for sub txn
commit #64739 (#64845)
e82ed7859a2 is described below
commit e82ed7859a2bdf73ef5a5d5eeda2126309544531
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jun 26 12:22:11 2026 +0800
branch-4.1: [fix](cloud) Persist update time for sub txn commit #64739
(#64845)
Cherry-picked from #64739
Co-authored-by: Refrain <[email protected]>
---
cloud/src/meta-service/meta_service_txn.cpp | 7 ++-
cloud/test/meta_service_operation_log_test.cpp | 7 +++
cloud/test/meta_service_test.cpp | 16 +++++-
.../test_txn_insert_table_update_time.groovy | 66 ++++++++++++++++++++++
4 files changed, 92 insertions(+), 4 deletions(-)
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index cd78f55b94d..18986625b7d 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -2990,10 +2990,14 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
}
// Save versions
+ int64_t version_update_time_ms =
+
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+ response->set_version_update_time_ms(version_update_time_ms);
for (auto& [partition_id, new_version] : new_versions) {
std::string ver_val;
VersionPB version_pb;
version_pb.set_version(new_version);
+ version_pb.set_update_time_ms(version_update_time_ms);
if (!version_pb.SerializeToString(&ver_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize version_pb when saving, txn_id=" <<
txn_id;
@@ -3007,7 +3011,8 @@ void MetaServiceImpl::commit_txn_with_sub_txn(const
CommitTxnRequest* request,
txn->put(version_key, ver_val);
LOG(INFO) << "put partition_version_key=" << hex(version_key)
<< " version:" << new_version << " txn_id=" << txn_id
- << " partition_id=" << partition_id;
+ << " partition_id=" << partition_id
+ << " update_time=" << version_update_time_ms;
VLOG_DEBUG << "txn_id=" << txn_id << " table_id=" << table_id
<< " partition_id=" << partition_id << " version=" <<
new_version;
diff --git a/cloud/test/meta_service_operation_log_test.cpp
b/cloud/test/meta_service_operation_log_test.cpp
index ee687cf532e..28f54fd45df 100644
--- a/cloud/test/meta_service_operation_log_test.cpp
+++ b/cloud/test/meta_service_operation_log_test.cpp
@@ -1296,6 +1296,8 @@ TEST(MetaServiceOperationLogTest, CommitTxnWithSubTxn) {
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
&commit_res, nullptr);
ASSERT_EQ(commit_res.status().code(), MetaServiceCode::OK);
+ ASSERT_TRUE(commit_res.has_version_update_time_ms());
+ ASSERT_GT(commit_res.version_update_time_ms(), 0);
}
auto txn_kv = meta_service->txn_kv();
@@ -1394,12 +1396,17 @@ TEST(MetaServiceOperationLogTest, CommitTxnWithSubTxn) {
VersionPB versioned_partition_version;
ASSERT_TRUE(versioned_partition_version.ParseFromString(partition_version_val));
ASSERT_EQ(versionstamp, commit_versionstamp);
+ ASSERT_TRUE(versioned_partition_version.has_update_time_ms());
+ ASSERT_EQ(versioned_partition_version.update_time_ms(),
+ commit_res.version_update_time_ms());
key = partition_version_key({instance_id, db_id, table_id,
partition_id});
ASSERT_EQ(txn->get(key, &partition_version_val), TxnErrorCode::TXN_OK);
VersionPB partition_version;
ASSERT_TRUE(partition_version.ParseFromString(partition_version_val));
ASSERT_EQ(versioned_partition_version.version(),
partition_version.version());
+ ASSERT_TRUE(partition_version.has_update_time_ms());
+ ASSERT_EQ(partition_version.update_time_ms(),
commit_res.version_update_time_ms());
}
{
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index afc76d01fc6..3dd78bd247a 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -2078,6 +2078,7 @@ TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
// commit txn
CommitTxnRequest req;
+ int64_t version_update_time_ms = 0;
{
brpc::Controller cntl;
req.set_cloud_unique_id("test_cloud_unique_id");
@@ -2110,6 +2111,9 @@ TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
&res, nullptr);
ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ ASSERT_TRUE(res.has_version_update_time_ms());
+ ASSERT_GT(res.version_update_time_ms(), 0);
+ version_update_time_ms = res.version_update_time_ms();
// std::cout << res.DebugString() << std::endl;
ASSERT_EQ(res.table_ids().size(), 3);
@@ -2211,18 +2215,24 @@ TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
std::string ver_val;
ASSERT_EQ(txn->get(ver_key, &ver_val), TxnErrorCode::TXN_OK);
VersionPB version;
- version.ParseFromString(ver_val);
+ ASSERT_TRUE(version.ParseFromString(ver_val));
ASSERT_EQ(version.version(), 2);
+ ASSERT_TRUE(version.has_update_time_ms());
+ ASSERT_EQ(version.update_time_ms(), version_update_time_ms);
ver_key = partition_version_key({mock_instance, db_id, t1, t1_p2});
ASSERT_EQ(txn->get(ver_key, &ver_val), TxnErrorCode::TXN_OK);
- version.ParseFromString(ver_val);
+ ASSERT_TRUE(version.ParseFromString(ver_val));
ASSERT_EQ(version.version(), 2);
+ ASSERT_TRUE(version.has_update_time_ms());
+ ASSERT_EQ(version.update_time_ms(), version_update_time_ms);
ver_key = partition_version_key({mock_instance, db_id, t1, t1_p1});
ASSERT_EQ(txn->get(ver_key, &ver_val), TxnErrorCode::TXN_OK);
- version.ParseFromString(ver_val);
+ ASSERT_TRUE(version.ParseFromString(ver_val));
ASSERT_EQ(version.version(), 3);
+ ASSERT_TRUE(version.has_update_time_ms());
+ ASSERT_EQ(version.update_time_ms(), version_update_time_ms);
// table version
std::string table_ver_key = table_version_key({mock_instance, db_id,
t1});
diff --git
a/regression-test/suites/cloud_p0/version/test_txn_insert_table_update_time.groovy
b/regression-test/suites/cloud_p0/version/test_txn_insert_table_update_time.groovy
new file mode 100644
index 00000000000..d3f3e3c34ae
--- /dev/null
+++
b/regression-test/suites/cloud_p0/version/test_txn_insert_table_update_time.groovy
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_txn_insert_table_update_time") {
+ if (!isCloudMode()) {
+ return
+ }
+
+ def tbl = "test_txn_insert_table_update_time"
+ sql """ DROP TABLE IF EXISTS ${tbl} """
+ sql """
+ CREATE TABLE ${tbl} (
+ id INT NOT NULL,
+ value INT NOT NULL
+ )
+ DUPLICATE KEY(id)
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ def beforeRows = sql """
+ SELECT UNIX_TIMESTAMP(update_time)
+ FROM information_schema.tables
+ WHERE table_schema = DATABASE() AND table_name = '${tbl}'
+ """
+ assertEquals(1, beforeRows.size())
+ assertTrue(beforeRows[0][0] != null)
+ def beforeUpdateTime = beforeRows[0][0]
+
+ Thread.sleep(1100)
+
+ sql """ BEGIN """
+ sql """ INSERT INTO ${tbl} VALUES (1, 10) """
+ sql """ INSERT INTO ${tbl} VALUES (2, 20) """
+ sql """ COMMIT """
+ sql """ SYNC """
+
+ def countRows = sql """ SELECT COUNT(*) FROM ${tbl} """
+ assertEquals(2, countRows[0][0])
+
+ def afterRows = sql """
+ SELECT UNIX_TIMESTAMP(update_time)
+ FROM information_schema.tables
+ WHERE table_schema = DATABASE() AND table_name = '${tbl}'
+ """
+ assertEquals(1, afterRows.size())
+ assertTrue(afterRows[0][0] != null)
+ def afterUpdateTime = afterRows[0][0]
+ assertTrue(afterUpdateTime > beforeUpdateTime)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]