This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new afcc6170f6c [fix](txn_manager) Add ingested rowsets to unused rowsets
when removing txn (#37417)
afcc6170f6c is described below
commit afcc6170f6ca8ae8553e6a4a4d21000a0cb905ea
Author: walter <[email protected]>
AuthorDate: Wed Jul 10 14:25:44 2024 +0800
[fix](txn_manager) Add ingested rowsets to unused rowsets when removing txn
(#37417)
Generally speaking, as long as a rowset has a version, it can be
considered not to be in a pending state. However, if the rowset was
created through ingesting binlogs, it will have a version but should
still be considered in a pending state because the ingesting txn has not
yet been committed.
This PR updates the condition for determining the pending state. If a
rowset is COMMITTED, the txn should be allowed to roll back even if a
version exists.
Cherry-pick #36551
---
be/src/olap/rowset/rowset.cpp | 11 +++++++++-
be/src/olap/rowset/rowset.h | 1 +
be/src/olap/storage_engine.cpp | 10 +++++++--
be/src/olap/txn_manager.cpp | 7 +++---
be/test/olap/test_data/rowset_meta3.json | 22 +++++++++++++++++++
be/test/olap/txn_manager_test.cpp | 37 ++++++++++++++++++++++++++++++++
6 files changed, 82 insertions(+), 6 deletions(-)
diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp
index f4667d3fb63..b5b68f4d38e 100644
--- a/be/src/olap/rowset/rowset.cpp
+++ b/be/src/olap/rowset/rowset.cpp
@@ -30,7 +30,16 @@ static bvar::Adder<size_t>
g_total_rowset_num("doris_total_rowset_num");
Rowset::Rowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr&
rowset_meta)
: _rowset_meta(rowset_meta), _refs_by_reader(0) {
- _is_pending = !_rowset_meta->has_version();
+ _is_pending = true;
+
+ // Generally speaking, as long as a rowset has a version, it can be
considered not to be in a pending state.
+ // However, if the rowset was created through ingesting binlogs, it will
have a version but should still be
+ // considered in a pending state because the ingesting txn has not yet
been committed.
+ if (_rowset_meta->has_version() && _rowset_meta->start_version() > 0 &&
+ _rowset_meta->rowset_state() != COMMITTED) {
+ _is_pending = false;
+ }
+
if (_is_pending) {
_is_cumulative = false;
} else {
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 6194703176f..87cfe0b0bea 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -163,6 +163,7 @@ public:
int64_t newest_write_timestamp() const { return
rowset_meta()->newest_write_timestamp(); }
bool is_segments_overlapping() const { return
rowset_meta()->is_segments_overlapping(); }
KeysType keys_type() { return _schema->keys_type(); }
+ RowsetStatePB rowset_meta_state() const { return
rowset_meta()->rowset_state(); }
// remove all files in this rowset
// TODO should we rename the method to remove_files() to be more specific?
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 164f312a8da..05fd873fcc6 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -710,8 +710,14 @@ void StorageEngine::clear_transaction_task(const
TTransactionId transaction_id,
<< ", tablet_uid=" << tablet_info.first.tablet_uid;
continue;
}
-
static_cast<void>(StorageEngine::instance()->txn_manager()->delete_txn(
- partition_id, tablet, transaction_id));
+ Status s =
StorageEngine::instance()->txn_manager()->delete_txn(partition_id, tablet,
+
transaction_id);
+ if (!s.ok()) {
+ LOG(WARNING) << "failed to clear transaction. txn_id=" <<
transaction_id
+ << ", partition_id=" << partition_id
+ << ", tablet_id=" << tablet_info.first.tablet_id
+ << ", status=" << s.to_string();
+ }
}
}
LOG(INFO) << "finish to clear transaction task. transaction_id=" <<
transaction_id;
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index ac9367be23b..2ed1ac5674d 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -604,13 +604,14 @@ Status TxnManager::delete_txn(OlapMeta* meta,
TPartitionId partition_id,
auto& load_info = load_itr->second;
auto& rowset = load_info->rowset;
if (rowset != nullptr && meta != nullptr) {
- if (rowset->version().first > 0) {
+ if (!rowset->is_pending()) {
return Status::Error<TRANSACTION_ALREADY_COMMITTED>(
"could not delete transaction from engine, just remove
it from memory not "
"delete from disk, because related rowset already
published. partition_id: "
- "{}, transaction_id: {}, tablet: {}, rowset id: {},
version:{}",
+ "{}, transaction_id: {}, tablet: {}, rowset id: {},
version: {}, state: {}",
key.first, key.second, tablet_info.to_string(),
- rowset->rowset_id().to_string(),
rowset->version().to_string());
+ rowset->rowset_id().to_string(),
rowset->version().to_string(),
+ RowsetStatePB_Name(rowset->rowset_meta_state()));
} else {
static_cast<void>(RowsetMetaManager::remove(meta, tablet_uid,
rowset->rowset_id()));
#ifndef BE_TEST
diff --git a/be/test/olap/test_data/rowset_meta3.json
b/be/test/olap/test_data/rowset_meta3.json
new file mode 100644
index 00000000000..f6048f93950
--- /dev/null
+++ b/be/test/olap/test_data/rowset_meta3.json
@@ -0,0 +1,22 @@
+{
+ "rowset_id": 10002,
+ "partition_id": 10001,
+ "tablet_id": 12046,
+ "tablet_schema_hash": 365187263,
+ "rowset_type": "BETA_ROWSET",
+ "rowset_state": "COMMITTED",
+ "start_version": 0,
+ "end_version": 1,
+ "num_rows": 0,
+ "total_disk_size": 0,
+ "data_disk_size": 0,
+ "index_disk_size": 0,
+ "empty": true,
+ "creation_time": 1552911435,
+ "tablet_uid": {
+ "hi": 10,
+ "lo": 10
+ },
+ "num_segments": 1,
+ "has_variant_type_in_schema": false
+}
diff --git a/be/test/olap/txn_manager_test.cpp
b/be/test/olap/txn_manager_test.cpp
index d33570e8a8d..77f1a16eb5b 100644
--- a/be/test/olap/txn_manager_test.cpp
+++ b/be/test/olap/txn_manager_test.cpp
@@ -54,6 +54,7 @@ static StorageEngine* k_engine = nullptr;
const std::string rowset_meta_path =
"./be/test/olap/test_data/rowset_meta.json";
const std::string rowset_meta_path_2 =
"./be/test/olap/test_data/rowset_meta2.json";
+const std::string rowset_meta_path_3 =
"./be/test/olap/test_data/rowset_meta3.json";
class TxnManagerTest : public testing::Test {
public:
@@ -169,6 +170,22 @@ public:
EXPECT_EQ(rowset_meta2->rowset_id(), rowset_id);
EXPECT_EQ(Status::OK(), RowsetFactory::create_rowset(_schema,
rowset_meta_path_2,
rowset_meta2,
&_rowset_diff_id));
+
+ // init rowset meta 3
+ _json_rowset_meta = "";
+ std::ifstream infile3(rowset_meta_path_3);
+ char buffer3[1024];
+ while (!infile3.eof()) {
+ infile3.getline(buffer3, 1024);
+ _json_rowset_meta = _json_rowset_meta + buffer3 + "\n";
+ }
+ _json_rowset_meta = _json_rowset_meta.substr(0,
_json_rowset_meta.size() - 1);
+ rowset_id.init(10002);
+ RowsetMetaSharedPtr rowset_meta3(new RowsetMeta());
+ rowset_meta3->init_from_json(_json_rowset_meta);
+ EXPECT_EQ(rowset_meta3->rowset_id(), rowset_id);
+ EXPECT_EQ(Status::OK(), RowsetFactory::create_rowset(_schema,
rowset_meta_path_3,
+ rowset_meta3,
&_rowset_ingested));
_tablet_uid = TabletUid(10, 10);
}
@@ -190,6 +207,7 @@ private:
RowsetSharedPtr _rowset;
RowsetSharedPtr _rowset_same_id;
RowsetSharedPtr _rowset_diff_id;
+ RowsetSharedPtr _rowset_ingested;
};
TEST_F(TxnManagerTest, PrepareNewTxn) {
@@ -363,4 +381,23 @@ TEST_F(TxnManagerTest, TabletVersionCache) {
EXPECT_EQ(tx6, 890);
}
+TEST_F(TxnManagerTest, DeleteCommittedTxnForIngestingBinlog) {
+ auto guard =
k_engine->pending_local_rowsets().add(_rowset_ingested->rowset_id());
+ auto st = _txn_mgr->commit_txn(_meta, partition_id, transaction_id,
tablet_id, _tablet_uid,
+ load_id, _rowset_ingested,
std::move(guard), false);
+ ASSERT_TRUE(st.ok()) << st;
+ RowsetMetaSharedPtr rowset_meta(new RowsetMeta());
+ st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid,
_rowset_ingested->rowset_id(),
+ rowset_meta);
+ ASSERT_TRUE(st.ok()) << st;
+ EXPECT_EQ(rowset_meta->rowset_id(), _rowset_ingested->rowset_id());
+ st = _txn_mgr->delete_txn(_meta, partition_id, transaction_id, tablet_id,
_tablet_uid);
+ ASSERT_TRUE(st.ok()) << st;
+ RowsetMetaSharedPtr rowset_meta2(new RowsetMeta());
+ st = RowsetMetaManager::get_rowset_meta(_meta, _tablet_uid,
_rowset_ingested->rowset_id(),
+ rowset_meta2);
+ ASSERT_FALSE(st.ok()) << st;
+
EXPECT_FALSE(k_engine->pending_local_rowsets().contains(_rowset_ingested->rowset_id()));
+}
+
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]