This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 0983526722a branch-3.0-pick: [Fix](cloud-mow) Check partition's
version to avoid wrongly update visible versions' delete bitmaps (#49710)
(#49796)
0983526722a is described below
commit 0983526722aaca0817a765433e2440d0fc39230a
Author: bobhan1 <[email protected]>
AuthorDate: Mon Apr 7 12:04:28 2025 +0800
branch-3.0-pick: [Fix](cloud-mow) Check partition's version to avoid
wrongly update visible versions' delete bitmaps (#49710) (#49796)
pick https://github.com/apache/doris/pull/49710
---
.../cloud/cloud_engine_calc_delete_bitmap_task.cpp | 3 +-
be/src/cloud/cloud_meta_mgr.cpp | 11 +-
be/src/cloud/cloud_meta_mgr.h | 3 +-
be/src/cloud/cloud_tablet.cpp | 12 +-
be/src/cloud/cloud_tablet.h | 5 +-
be/src/olap/base_tablet.cpp | 3 +-
be/src/olap/base_tablet.h | 4 +-
be/src/olap/tablet.cpp | 3 +-
be/src/olap/tablet.h | 3 +-
cloud/src/meta-service/meta_service.cpp | 114 +++++-
cloud/src/meta-service/meta_service_txn.cpp | 1 +
cloud/test/meta_service_test.cpp | 384 ++++++++++++++++++++-
gensrc/proto/cloud.proto | 7 +
13 files changed, 536 insertions(+), 17 deletions(-)
diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
index 86b369b3db1..39c0575c8b1 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
@@ -259,7 +259,8 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
// we still need to update delete bitmap KVs to MS when we skip to
calcalate delete bitmaps,
// because the pending delete bitmap KVs in MS we wrote before may
have been removed and replaced by other txns
- RETURN_IF_ERROR(tablet->save_delete_bitmap_to_ms(_version,
_transaction_id, delete_bitmap));
+ RETURN_IF_ERROR(tablet->save_delete_bitmap_to_ms(_version,
_transaction_id, delete_bitmap,
+ _version));
LOG(INFO) << "tablet=" << _tablet_id << ", txn=" << _transaction_id
<< ", publish_status=SUCCEED, not need to re-calculate
delete_bitmaps.";
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index c92d5e9404e..6f6024f1912 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -1141,7 +1141,9 @@ Status CloudMetaMgr::update_tablet_schema(int64_t
tablet_id, const TabletSchema&
}
Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t
lock_id,
- int64_t initiator, DeleteBitmap*
delete_bitmap) {
+ int64_t initiator, DeleteBitmap*
delete_bitmap,
+ int64_t txn_id, bool is_explicit_txn,
+ int64_t next_visible_version) {
VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet.tablet_id();
UpdateDeleteBitmapRequest req;
UpdateDeleteBitmapResponse res;
@@ -1151,6 +1153,13 @@ Status CloudMetaMgr::update_delete_bitmap(const
CloudTablet& tablet, int64_t loc
req.set_tablet_id(tablet.tablet_id());
req.set_lock_id(lock_id);
req.set_initiator(initiator);
+ req.set_is_explicit_txn(is_explicit_txn);
+ if (txn_id > 0) {
+ req.set_txn_id(txn_id);
+ }
+ if (next_visible_version > 0) {
+ req.set_next_visible_version(next_visible_version);
+ }
for (auto& [key, bitmap] : delete_bitmap->delete_bitmap) {
req.add_rowset_ids(std::get<0>(key).to_string());
req.add_segment_ids(std::get<1>(key));
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index d06e55e69ad..a666a5e4d16 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -99,7 +99,8 @@ public:
Status update_tablet_schema(int64_t tablet_id, const TabletSchema&
tablet_schema);
Status update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id,
int64_t initiator,
- DeleteBitmap* delete_bitmap);
+ DeleteBitmap* delete_bitmap, int64_t txn_id =
-1,
+ bool is_explicit_txn = false, int64_t
next_visible_version = -1);
Status cloud_update_delete_bitmap_without_lock(const CloudTablet& tablet,
DeleteBitmap*
delete_bitmap);
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index a1026c9518d..cf3fe051bec 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -699,7 +699,8 @@ CalcDeleteBitmapExecutor*
CloudTablet::calc_delete_bitmap_executor() {
Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t
txn_id,
DeleteBitmapPtr delete_bitmap,
RowsetWriter* rowset_writer,
- const RowsetIdUnorderedSet&
cur_rowset_ids) {
+ const RowsetIdUnorderedSet&
cur_rowset_ids,
+ int64_t next_visible_version) {
RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();
// update delete bitmap info, in order to avoid recalculation when trying
again
@@ -715,7 +716,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo*
txn_info, int64_t tx
RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
}
- RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id,
delete_bitmap));
+ RETURN_IF_ERROR(
+ save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap,
next_visible_version));
// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache
because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when
re-calculating the delete bitmap, during which it will do
@@ -745,7 +747,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo*
txn_info, int64_t tx
}
Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t
txn_id,
- DeleteBitmapPtr delete_bitmap) {
+ DeleteBitmapPtr delete_bitmap,
+ int64_t next_visible_version) {
DeleteBitmapPtr new_delete_bitmap =
std::make_shared<DeleteBitmap>(tablet_id());
for (auto iter = delete_bitmap->delete_bitmap.begin();
iter != delete_bitmap->delete_bitmap.end(); ++iter) {
@@ -758,7 +761,8 @@ Status CloudTablet::save_delete_bitmap_to_ms(int64_t
cur_version, int64_t txn_id
}
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, txn_id,
LOAD_INITIATOR_ID,
-
new_delete_bitmap.get()));
+
new_delete_bitmap.get(), txn_id, false,
+
next_visible_version));
return Status::OK();
}
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index b2d6e8921b0..4226e26a0f5 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -170,10 +170,11 @@ public:
Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter*
rowset_writer,
- const RowsetIdUnorderedSet& cur_rowset_ids)
override;
+ const RowsetIdUnorderedSet& cur_rowset_ids,
+ int64_t next_visible_version = -1) override;
Status save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
- DeleteBitmapPtr delete_bitmap);
+ DeleteBitmapPtr delete_bitmap, int64_t
next_visible_version);
Status calc_delete_bitmap_for_compaction(const
std::vector<RowsetSharedPtr>& input_rowsets,
const RowsetSharedPtr&
output_rowset,
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 4250a9f09b8..c33043b3b64 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -1352,7 +1352,8 @@ Status BaseTablet::update_delete_bitmap(const
BaseTabletSPtr& self, TabletTxnInf
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
auto t5 = watch.get_elapse_time_us();
RETURN_IF_ERROR(self->save_delete_bitmap(txn_info, txn_id, delete_bitmap,
- transient_rs_writer.get(),
cur_rowset_ids));
+ transient_rs_writer.get(),
cur_rowset_ids,
+ cur_version));
// defensive check, check that the delete bitmap cache we wrote is correct
RETURN_IF_ERROR(self->check_delete_bitmap_cache(txn_id,
delete_bitmap.get()));
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 693f08dcac7..40928e63729 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -228,10 +228,10 @@ public:
static Status update_delete_bitmap(const BaseTabletSPtr& self,
TabletTxnInfo* txn_info,
int64_t txn_id, int64_t txn_expiration
= 0);
-
virtual Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t
txn_id,
DeleteBitmapPtr delete_bitmap,
RowsetWriter* rowset_writer,
- const RowsetIdUnorderedSet&
cur_rowset_ids) = 0;
+ const RowsetIdUnorderedSet&
cur_rowset_ids,
+ int64_t next_visible_version = -1) = 0;
virtual CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() = 0;
void calc_compaction_output_rowset_delete_bitmap(
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 78cb6f23844..a1011661fc0 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2483,7 +2483,8 @@ CalcDeleteBitmapExecutor*
Tablet::calc_delete_bitmap_executor() {
Status Tablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t
txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter*
rowset_writer,
- const RowsetIdUnorderedSet& cur_rowset_ids) {
+ const RowsetIdUnorderedSet& cur_rowset_ids,
+ int64_t next_visible_version) {
RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 96bc5d87e3c..ff18ce50657 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -417,7 +417,8 @@ public:
CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() override;
Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter*
rowset_writer,
- const RowsetIdUnorderedSet& cur_rowset_ids)
override;
+ const RowsetIdUnorderedSet& cur_rowset_ids,
+ int64_t next_visible_version = -1) override;
void merge_delete_bitmap(const DeleteBitmap& delete_bitmap);
bool check_all_rowset_segment();
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index 6ceee180939..78daf880c54 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -1829,6 +1829,106 @@ static bool
process_pending_delete_bitmap(MetaServiceCode& code, std::string& ms
return true;
}
+// When a load txn retries in publish phase with different version to publish,
it will gain delete bitmap lock
+// many times. these locks are *different*, but they are the same in the
current implementation because they have
+// the same lock_id and initiator and don't have version info. If some delete
bitmap calculation task with version X
+// on BE lasts long and try to update delete bitmaps on MS when the txn gains
the lock in later retries
+// with version Y(Y > X) to publish. It may wrongly update version X's delete
bitmaps because the lock don't have version info.
+//
+// This function checks whether the partition version is correct when updating
the delete bitmap
+// to avoid wrongly update an visible version's delete bitmaps.
+// 1. get the db id with txn id
+// 2. get the partition version with db id, table id and partition id
+// 3. check if the partition version matches the updating version
+static bool check_partition_version_when_update_delete_bitmap(
+ MetaServiceCode& code, std::string& msg, std::unique_ptr<Transaction>&
txn,
+ std::string& instance_id, int64_t table_id, int64_t partition_id,
int64_t tablet_id,
+ int64_t txn_id, int64_t next_visible_version) {
+ if (partition_id <= 0) {
+ LOG(WARNING) << fmt::format(
+ "invalid partition_id, skip to check partition version.
txn={}, "
+ "table_id={}, partition_id={}, tablet_id={}",
+ txn_id, table_id, partition_id, tablet_id);
+ return true;
+ }
+ // Get db id with txn id
+ std::string index_val;
+ const std::string index_key = txn_index_key({instance_id, txn_id});
+ auto err = txn->get(index_key, &index_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get db id, txn_id={} err={}", txn_id,
err);
+ LOG(WARNING) << msg;
+ return false;
+ }
+
+ TxnIndexPB index_pb;
+ if (!index_pb.ParseFromString(index_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("failed to parse txn_index_pb, txn_id={}", txn_id);
+ LOG(WARNING) << msg;
+ return false;
+ }
+
+ DCHECK(index_pb.has_tablet_index())
+ << fmt::format("txn={}, table_id={}, partition_id={},
tablet_id={}, index_pb={}",
+ txn_id, table_id, partition_id, tablet_id,
proto_to_json(index_pb));
+ DCHECK(index_pb.tablet_index().has_db_id())
+ << fmt::format("txn={}, table_id={}, partition_id={},
tablet_id={}, index_pb={}",
+ txn_id, table_id, partition_id, tablet_id,
proto_to_json(index_pb));
+ if (!index_pb.has_tablet_index() || !index_pb.tablet_index().has_db_id()) {
+ LOG(WARNING) << fmt::format(
+ "has no db_id in TxnIndexPB, skip to check partition version.
txn={}, "
+ "table_id={}, partition_id={}, tablet_id={}, index_pb={}",
+ txn_id, table_id, partition_id, tablet_id,
proto_to_json(index_pb));
+ return true;
+ }
+ int64_t db_id = index_pb.tablet_index().db_id();
+
+ std::string ver_key = partition_version_key({instance_id, db_id, table_id,
partition_id});
+ std::string ver_val;
+ err = txn->get(ver_key, &ver_val);
+ if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to get partition version, txn_id={},
tablet={}, err={}", txn_id,
+ tablet_id, err);
+ LOG(WARNING) << msg;
+ return false;
+ }
+
+ int64_t cur_max_version {-1};
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ cur_max_version = 1;
+ } else {
+ VersionPB version_pb;
+ if (!version_pb.ParseFromString(ver_val)) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = fmt::format("failed to parse version_pb, txn_id={},
tablet={}, key={}", txn_id,
+ tablet_id, hex(ver_key));
+ LOG(WARNING) << msg;
+ return false;
+ }
+ DCHECK(version_pb.has_version());
+ cur_max_version = version_pb.version();
+
+ if (version_pb.pending_txn_ids_size() > 0) {
+ DCHECK(version_pb.pending_txn_ids_size() == 1);
+ cur_max_version += version_pb.pending_txn_ids_size();
+ }
+ }
+
+ if (cur_max_version + 1 != next_visible_version) {
+ code = MetaServiceCode::VERSION_NOT_MATCH;
+ msg = fmt::format(
+ "check version failed when update_delete_bitmap, txn={},
table_id={}, "
+ "partition_id={}, tablet_id={}, found partition's max version
is {}, but "
+ "request next_visible_version is {}",
+ txn_id, table_id, partition_id, tablet_id, cur_max_version,
next_visible_version);
+ return false;
+ }
+ return true;
+}
+
void MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController*
controller,
const UpdateDeleteBitmapRequest*
request,
UpdateDeleteBitmapResponse*
response,
@@ -1880,7 +1980,17 @@ void
MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont
}
}
- // 3. store all pending delete bitmap for this txn
+ // 3. check if partition's version matches
+ if (request->lock_id() > 0 && request->has_txn_id() &&
request->has_partition_id() &&
+ request->has_next_visible_version()) {
+ if (!check_partition_version_when_update_delete_bitmap(
+ code, msg, txn, instance_id, table_id,
request->partition_id(), tablet_id,
+ request->txn_id(), request->next_visible_version())) {
+ return;
+ }
+ }
+
+ // 4. store all pending delete bitmap for this txn
PendingDeleteBitmapPB delete_bitmap_keys;
for (size_t i = 0; i < request->rowset_ids_size(); ++i) {
MetaDeleteBitmapInfo key_info {instance_id, tablet_id,
request->rowset_ids(i),
@@ -1919,7 +2029,7 @@ void
MetaServiceImpl::update_delete_bitmap(google::protobuf::RpcController* cont
}
}
- // 4. Update delete bitmap for curent txn
+ // 5. Update delete bitmap for curent txn
size_t current_key_count = 0;
size_t current_value_count = 0;
size_t total_key_count = 0;
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 8bec142e13c..b251ec64ed0 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -3163,6 +3163,7 @@ void
MetaServiceImpl::begin_sub_txn(::google::protobuf::RpcController* controlle
const std::string index_key = txn_index_key({instance_id, sub_txn_id});
std::string index_val;
TxnIndexPB index_pb;
+ index_pb.mutable_tablet_index()->set_db_id(db_id);
if (!index_pb.SerializeToString(&index_val)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
ss << "failed to serialize txn_index_pb "
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index 10a5b3c6f18..0a1be69e1eb 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -154,6 +154,18 @@ static void create_tablet(MetaServiceProxy* meta_service,
int64_t table_id, int6
ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
}
+static void create_tablet_with_db_id(MetaServiceProxy* meta_service, int64_t
db_id,
+ int64_t table_id, int64_t index_id,
int64_t partition_id,
+ int64_t tablet_id) {
+ brpc::Controller cntl;
+ CreateTabletsRequest req;
+ CreateTabletsResponse res;
+ req.set_db_id(db_id);
+ add_tablet(req, table_id, index_id, partition_id, tablet_id);
+ meta_service->create_tablets(&cntl, &req, &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK) << tablet_id;
+}
+
static void begin_txn(MetaServiceProxy* meta_service, int64_t db_id, const
std::string& label,
int64_t table_id, int64_t& txn_id) {
brpc::Controller cntl;
@@ -2050,7 +2062,7 @@ TEST(MetaServiceTest, CommitTxnWithSubTxnTest) {
index_key = txn_index_key({mock_instance, sub_txn_id3});
ASSERT_EQ(txn->get(index_key, &index_val), TxnErrorCode::TXN_OK);
txn_index.ParseFromString(index_val);
- ASSERT_FALSE(txn_index.has_tablet_index());
+ ASSERT_TRUE(txn_index.has_tablet_index());
// txn_label
std::string label_key = txn_label_key({mock_instance, db_id, label});
@@ -4923,6 +4935,376 @@ TEST(MetaServiceTest, UpdateDeleteBitmapWithBigKeys) {
ASSERT_EQ(update_delete_bitmap_res.status().code(), MetaServiceCode::OK);
}
+static void set_partition_version(MetaServiceProxy* meta_service,
std::string_view instance_id,
+ int64_t db_id, int64_t table_id, int64_t
partition_id,
+ int64_t version, std::vector<int64_t>
pending_txn_ids = {}) {
+ std::string ver_key = partition_version_key({instance_id, db_id, table_id,
partition_id});
+ std::string ver_val;
+ VersionPB version_pb;
+ version_pb.set_version(version);
+ if (!pending_txn_ids.empty()) {
+ for (auto txn_id : pending_txn_ids) {
+ version_pb.add_pending_txn_ids(txn_id);
+ }
+ }
+ ASSERT_TRUE(version_pb.SerializeToString(&ver_val));
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+ txn->put(ver_key, ver_val);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+}
+
+static void begin_txn_and_commit_rowset(MetaServiceProxy* meta_service, const
std::string& label,
+ int64_t db_id, int64_t table_id,
int64_t partition_id,
+ int64_t tablet_id, int64_t* txn_id) {
+ begin_txn(meta_service, db_id, label, table_id, *txn_id);
+ CreateRowsetResponse res;
+ auto rowset = create_rowset(*txn_id, tablet_id, partition_id);
+ prepare_rowset(meta_service, rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+ res.Clear();
+ commit_rowset(meta_service, rowset, res);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+}
+
+static void get_delete_bitmap_update_lock(MetaServiceProxy* meta_service,
int64_t table_id,
+ int64_t partition_id, int64_t
lock_id,
+ int64_t initiator) {
+ brpc::Controller cntl;
+ GetDeleteBitmapUpdateLockRequest get_lock_req;
+ GetDeleteBitmapUpdateLockResponse get_lock_res;
+ get_lock_req.set_cloud_unique_id("test_cloud_unique_id");
+ get_lock_req.set_table_id(table_id);
+ get_lock_req.add_partition_ids(partition_id);
+ get_lock_req.set_expiration(5);
+ get_lock_req.set_lock_id(lock_id);
+ get_lock_req.set_initiator(initiator);
+ meta_service->get_delete_bitmap_update_lock(
+ reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&get_lock_req,
+ &get_lock_res, nullptr);
+ ASSERT_EQ(get_lock_res.status().code(), MetaServiceCode::OK);
+}
+
+static void update_delete_bitmap(MetaServiceProxy* meta_service,
+ UpdateDeleteBitmapRequest&
update_delete_bitmap_req,
+ UpdateDeleteBitmapResponse&
update_delete_bitmap_res,
+ int64_t table_id, int64_t partition_id,
int64_t lock_id,
+ int64_t initiator, int64_t tablet_id, int64_t
txn_id,
+ int64_t next_visible_version, std::string
data = "1111") {
+ brpc::Controller cntl;
+ update_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id");
+ update_delete_bitmap_req.set_table_id(table_id);
+ update_delete_bitmap_req.set_partition_id(partition_id);
+ update_delete_bitmap_req.set_lock_id(lock_id);
+ update_delete_bitmap_req.set_initiator(initiator);
+ update_delete_bitmap_req.set_tablet_id(tablet_id);
+ update_delete_bitmap_req.set_txn_id(txn_id);
+ update_delete_bitmap_req.set_next_visible_version(next_visible_version);
+ update_delete_bitmap_req.add_rowset_ids("123");
+ update_delete_bitmap_req.add_segment_ids(0);
+ update_delete_bitmap_req.add_versions(next_visible_version);
+ update_delete_bitmap_req.add_segment_delete_bitmaps(data);
+
meta_service->update_delete_bitmap(reinterpret_cast<google::protobuf::RpcController*>(&cntl),
+ &update_delete_bitmap_req,
&update_delete_bitmap_res,
+ nullptr);
+}
+
+TEST(MetaServiceTest, UpdateDeleteBitmapCheckPartitionVersion) {
+ auto meta_service = get_meta_service();
+ brpc::Controller cntl;
+
+ extern std::string get_instance_id(const std::shared_ptr<ResourceManager>&
rc_mgr,
+ const std::string& cloud_unique_id);
+ auto instance_id = get_instance_id(meta_service->resource_mgr(),
"test_cloud_unique_id");
+
+ {
+ // 1. normal path
+ // 1.1 has partition version and request version matches
+ int64_t db_id = 999;
+ int64_t table_id = 1001;
+ int64_t index_id = 4001;
+ int64_t t1p1 = 2001;
+ int64_t tablet_id = 3001;
+ int64_t initiator = -1;
+ int64_t cur_max_version = 100;
+ int64_t txn_id;
+ ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(),
db_id, table_id,
+ index_id, t1p1,
tablet_id));
+ begin_txn_and_commit_rowset(meta_service.get(), "label11", db_id,
table_id, t1p1, tablet_id,
+ &txn_id);
+ int64_t lock_id = txn_id;
+
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1,
lock_id, initiator);
+ set_partition_version(meta_service.get(), instance_id, db_id,
table_id, t1p1,
+ cur_max_version);
+
+ UpdateDeleteBitmapRequest update_delete_bitmap_req;
+ UpdateDeleteBitmapResponse update_delete_bitmap_res;
+ update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res,
+ table_id, t1p1, lock_id, initiator, tablet_id,
txn_id,
+ cur_max_version + 1);
+ ASSERT_EQ(update_delete_bitmap_res.status().code(),
MetaServiceCode::OK);
+ }
+
+ {
+ // 1. normal path
+ // 1.2 does not have partition version KV and request version matches
+ int64_t db_id = 999;
+ int64_t table_id = 1002;
+ int64_t index_id = 4001;
+ int64_t t1p1 = 2001;
+ int64_t tablet_id = 3001;
+ int64_t initiator = -1;
+ int64_t txn_id;
+ ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(),
db_id, table_id,
+ index_id, t1p1,
tablet_id));
+ begin_txn_and_commit_rowset(meta_service.get(), "label12", db_id,
table_id, t1p1, tablet_id,
+ &txn_id);
+ int64_t lock_id = txn_id;
+
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1,
lock_id, initiator);
+
+ UpdateDeleteBitmapRequest update_delete_bitmap_req;
+ UpdateDeleteBitmapResponse update_delete_bitmap_res;
+ update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res,
+ table_id, t1p1, lock_id, initiator, tablet_id,
txn_id, 2);
+ ASSERT_EQ(update_delete_bitmap_res.status().code(),
MetaServiceCode::OK);
+ }
+
+ {
+ // 1. normal path
+ // 1.3 has partition version and pending txn, and request version
matches
+ int64_t db_id = 999;
+ int64_t table_id = 1003;
+ int64_t index_id = 4001;
+ int64_t t1p1 = 2001;
+ int64_t tablet_id = 3001;
+ int64_t initiator = -1;
+ int64_t cur_max_version = 120;
+ int64_t txn_id;
+ ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(),
db_id, table_id,
+ index_id, t1p1,
tablet_id));
+ begin_txn_and_commit_rowset(meta_service.get(), "label13", db_id,
table_id, t1p1, tablet_id,
+ &txn_id);
+ int64_t lock_id = txn_id;
+
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1,
lock_id, initiator);
+ set_partition_version(meta_service.get(), instance_id, db_id,
table_id, t1p1,
+ cur_max_version, {12345});
+
+ UpdateDeleteBitmapRequest update_delete_bitmap_req;
+ UpdateDeleteBitmapResponse update_delete_bitmap_res;
+ update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res,
+ table_id, t1p1, lock_id, initiator, tablet_id,
txn_id,
+ cur_max_version + 2);
+ ASSERT_EQ(update_delete_bitmap_res.status().code(),
MetaServiceCode::OK);
+ }
+}
+
+TEST(MetaServiceTest, UpdateDeleteBitmapCheckPartitionVersionFail) {
+ auto meta_service = get_meta_service();
+ brpc::Controller cntl;
+
+ extern std::string get_instance_id(const std::shared_ptr<ResourceManager>&
rc_mgr,
+ const std::string& cloud_unique_id);
+ auto instance_id = get_instance_id(meta_service->resource_mgr(),
"test_cloud_unique_id");
+
+ {
+ // 2. abnormal path
+ // 2.1 has partition version but request version does not match
+ int64_t db_id = 999;
+ int64_t table_id = 2001;
+ int64_t index_id = 4001;
+ int64_t t1p1 = 2001;
+ int64_t tablet_id = 3001;
+ int64_t initiator = -1;
+ int64_t cur_max_version = 100;
+ int64_t txn_id;
+ ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(),
db_id, table_id,
+ index_id, t1p1,
tablet_id));
+ begin_txn_and_commit_rowset(meta_service.get(), "label21", db_id,
table_id, t1p1, tablet_id,
+ &txn_id);
+ int64_t lock_id = txn_id;
+
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1,
lock_id, initiator);
+ set_partition_version(meta_service.get(), instance_id, db_id,
table_id, t1p1,
+ cur_max_version);
+
+ UpdateDeleteBitmapRequest update_delete_bitmap_req;
+ UpdateDeleteBitmapResponse update_delete_bitmap_res;
+ // wrong version
+ update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res,
+ table_id, t1p1, lock_id, initiator, tablet_id,
txn_id,
+ cur_max_version + 2);
+ ASSERT_EQ(update_delete_bitmap_res.status().code(),
MetaServiceCode::VERSION_NOT_MATCH);
+ }
+
+ {
+ // 2. abnormal path
+ // 2.2 does not have partition version KV and request version does not
match
+ int64_t db_id = 999;
+ int64_t table_id = 2002;
+ int64_t index_id = 4001;
+ int64_t t1p1 = 2001;
+ int64_t tablet_id = 3001;
+ int64_t initiator = -1;
+ int64_t txn_id;
+ ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(),
db_id, table_id,
+ index_id, t1p1,
tablet_id));
+ begin_txn_and_commit_rowset(meta_service.get(), "label22", db_id,
table_id, t1p1, tablet_id,
+ &txn_id);
+ int64_t lock_id = txn_id;
+
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1,
lock_id, initiator);
+
+ UpdateDeleteBitmapRequest update_delete_bitmap_req;
+ UpdateDeleteBitmapResponse update_delete_bitmap_res;
+ // first load, wrong version
+ update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res,
+ table_id, t1p1, lock_id, initiator, tablet_id,
txn_id, 10);
+ ASSERT_EQ(update_delete_bitmap_res.status().code(),
MetaServiceCode::VERSION_NOT_MATCH);
+ }
+
+ {
+ // 2. abnormal path
+ // 2.3 has partition version and pending txn, and request version
matches
+ int64_t db_id = 999;
+ int64_t table_id = 2003;
+ int64_t index_id = 4001;
+ int64_t t1p1 = 2001;
+ int64_t tablet_id = 3001;
+ int64_t initiator = -1;
+ int64_t cur_max_version = 120;
+ int64_t txn_id;
+ ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(),
db_id, table_id,
+ index_id, t1p1,
tablet_id));
+ begin_txn_and_commit_rowset(meta_service.get(), "label23", db_id,
table_id, t1p1, tablet_id,
+ &txn_id);
+ int64_t lock_id = txn_id;
+
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1,
lock_id, initiator);
+ set_partition_version(meta_service.get(), instance_id, db_id,
table_id, t1p1,
+ cur_max_version, {12345});
+
+ UpdateDeleteBitmapRequest update_delete_bitmap_req;
+ UpdateDeleteBitmapResponse update_delete_bitmap_res;
+ // wrong version
+ update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res,
+ table_id, t1p1, lock_id, initiator, tablet_id,
txn_id,
+ cur_max_version + 1);
+ ASSERT_EQ(update_delete_bitmap_res.status().code(),
MetaServiceCode::VERSION_NOT_MATCH);
+ }
+}
+
+TEST(MetaServiceTest, UpdateDeleteBitmapFailCase) {
+ // simulate the situation described in
https://github.com/apache/doris/pull/49710
+ auto meta_service = get_meta_service();
+ brpc::Controller cntl;
+ extern std::string get_instance_id(const std::shared_ptr<ResourceManager>&
rc_mgr,
+ const std::string& cloud_unique_id);
+ auto instance_id = get_instance_id(meta_service->resource_mgr(),
"test_cloud_unique_id");
+
+ int64_t db_id = 1999;
+ int64_t table_id = 1001;
+ int64_t index_id = 4001;
+ int64_t t1p1 = 2001;
+ int64_t tablet_id = 3001;
+ int64_t initiator = -1;
+ int64_t cur_max_version = 100;
+ set_partition_version(meta_service.get(), instance_id, db_id, table_id,
t1p1, cur_max_version);
+ ASSERT_NO_FATAL_FAILURE(create_tablet_with_db_id(meta_service.get(),
db_id, table_id, index_id,
+ t1p1, tablet_id));
+
+ // txn1 begins
+ int64_t txn_id1;
+ begin_txn_and_commit_rowset(meta_service.get(), "label31", db_id,
table_id, t1p1, tablet_id,
+ &txn_id1);
+ int64_t txn1_version_to_publish = cur_max_version + 1;
+ // txn1 gains the lock and try to publish with version 101
+ int64_t lock_id = txn_id1;
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id,
initiator);
+
+ // txn1 failed due to calculation timeout and removes the delete bitmap
lock
+ RemoveDeleteBitmapUpdateLockRequest remove_req;
+ RemoveDeleteBitmapUpdateLockResponse remove_res;
+ remove_req.set_cloud_unique_id("test_cloud_unique_id");
+ remove_req.set_table_id(table_id);
+ remove_req.set_lock_id(lock_id);
+ remove_req.set_initiator(-1);
+ meta_service->remove_delete_bitmap_update_lock(
+ reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&remove_req, &remove_res,
+ nullptr);
+ ASSERT_EQ(remove_res.status().code(), MetaServiceCode::OK);
+
+ // txn2 gains the lock and succeeds to publish with version 101
+ int64_t txn_id2;
+ begin_txn_and_commit_rowset(meta_service.get(), "label32", db_id,
table_id, t1p1, tablet_id,
+ &txn_id2);
+ lock_id = txn_id2;
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id,
initiator);
+
+ int64_t txn2_version_to_publish = cur_max_version + 1;
+ UpdateDeleteBitmapRequest update_delete_bitmap_req;
+ UpdateDeleteBitmapResponse update_delete_bitmap_res;
+ std::string data1 = "1234";
+ update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res,
+ table_id, t1p1, lock_id, initiator, tablet_id,
txn_id2,
+ txn2_version_to_publish, data1);
+
+ CommitTxnRequest req;
+ req.set_cloud_unique_id("test_cloud_unique_id");
+ req.set_db_id(db_id);
+ req.set_txn_id(txn_id2);
+ req.add_mow_table_ids(table_id);
+ CommitTxnResponse res;
+
meta_service->commit_txn(reinterpret_cast<::google::protobuf::RpcController*>(&cntl),
&req,
+ &res, nullptr);
+ ASSERT_EQ(res.status().code(), MetaServiceCode::OK);
+
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(meta_service->txn_kv()->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string ver_key = partition_version_key({instance_id, db_id, table_id,
t1p1});
+ std::string ver_val;
+ VersionPB version_pb;
+ auto ret = txn->get(ver_key, &ver_val);
+ ASSERT_EQ(ret, TxnErrorCode::TXN_OK);
+ ASSERT_TRUE(version_pb.ParseFromString(ver_val));
+ ASSERT_EQ(version_pb.version(), cur_max_version + 1);
+
+ std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id,
table_id, -1});
+ std::string lock_val;
+ ret = txn->get(lock_key, &lock_val);
+ ASSERT_EQ(ret, TxnErrorCode::TXN_KEY_NOT_FOUND);
+
+ // txn1 retries to publish and gains the lock, try to publish with version
102
+ lock_id = txn_id1;
+ get_delete_bitmap_update_lock(meta_service.get(), table_id, t1p1, lock_id,
initiator);
+
+ // txn1's previous calculation task finshes and try to update delete
bitmap with version 101
+ std::string data2 = "5678";
+ update_delete_bitmap(meta_service.get(), update_delete_bitmap_req,
update_delete_bitmap_res,
+ table_id, t1p1, lock_id, initiator, tablet_id,
txn_id1,
+ txn1_version_to_publish, data2);
+ // this should fail
+ ASSERT_EQ(update_delete_bitmap_res.status().code(),
MetaServiceCode::VERSION_NOT_MATCH);
+
+ GetDeleteBitmapRequest get_delete_bitmap_req;
+ GetDeleteBitmapResponse get_delete_bitmap_res;
+ get_delete_bitmap_req.set_cloud_unique_id("test_cloud_unique_id");
+ get_delete_bitmap_req.set_tablet_id(tablet_id);
+ get_delete_bitmap_req.add_rowset_ids("123");
+ get_delete_bitmap_req.add_begin_versions(0);
+ get_delete_bitmap_req.add_end_versions(cur_max_version + 1);
+
meta_service->get_delete_bitmap(reinterpret_cast<google::protobuf::RpcController*>(&cntl),
+ &get_delete_bitmap_req,
&get_delete_bitmap_res, nullptr);
+ ASSERT_EQ(get_delete_bitmap_res.status().code(), MetaServiceCode::OK);
+ ASSERT_EQ(get_delete_bitmap_res.rowset_ids_size(), 1);
+ ASSERT_EQ(get_delete_bitmap_res.versions_size(), 1);
+ ASSERT_EQ(get_delete_bitmap_res.segment_ids_size(), 1);
+ ASSERT_EQ(get_delete_bitmap_res.segment_delete_bitmaps_size(), 1);
+ ASSERT_EQ(get_delete_bitmap_res.segment_delete_bitmaps(0), data1);
+}
+
TEST(MetaServiceTest, UpdateDeleteBitmap) {
auto meta_service = get_meta_service();
diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto
index 0add57c3de0..c18b35ce15f 100644
--- a/gensrc/proto/cloud.proto
+++ b/gensrc/proto/cloud.proto
@@ -1394,6 +1394,7 @@ enum MetaServiceCode {
LOCK_EXPIRED = 8001;
LOCK_CONFLICT = 8002;
ROWSETS_EXPIRED = 8003;
+ VERSION_NOT_MATCH = 8004;
// partial update
ROWSET_META_NOT_FOUND = 9001;
@@ -1419,6 +1420,12 @@ message UpdateDeleteBitmapRequest {
// Serialized roaring bitmaps indexed with {rowset_id, segment_id, version}
repeated bytes segment_delete_bitmaps = 10;
optional bool unlock = 11;
+ // to determine whether this is in an explicit txn and whether it's the
first sub txn
+ optional bool is_explicit_txn = 12;
+ optional int64 txn_id = 13;
+
+ // for load txn only
+ optional int64 next_visible_version = 14;
}
message UpdateDeleteBitmapResponse {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]