This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 28707219b94 [Fix](recycler) Further fix for #47475 (#47486)
28707219b94 is described below
commit 28707219b947c9a0ccc48468cb7cb9b252b722ba
Author: abmdocrt <[email protected]>
AuthorDate: Mon Feb 3 10:23:10 2025 +0800
[Fix](recycler) Further fix for #47475 (#47486)
Related PR: #47475
在pr#47475中,我们修复了潜在的少删数据的问题,是通过在delete rowset data函数中添加删除逻辑,删除recycle
tablet中漏删的文件,但是那个pr忽略了其中存在的一个判断条件,导致在recycle tablet中漏删的文件被跳过了,没有实际删除。
在此pr中,tmp
rowset不再依赖于上述判断条件,尽可能删除每一个rowset数据,包括倒排索引v2的数据,但是普通的rowset不会跳过这个判断条件,因为普通rowset数量过大,如果不跳过,可能会影响删除效率。
---
cloud/src/recycler/hdfs_accessor.cpp | 1 +
cloud/src/recycler/recycler.cpp | 24 +++++++++++++++---------
cloud/src/recycler/recycler.h | 8 +++++++-
cloud/test/recycler_test.cpp | 10 ++++++++--
4 files changed, 31 insertions(+), 12 deletions(-)
diff --git a/cloud/src/recycler/hdfs_accessor.cpp
b/cloud/src/recycler/hdfs_accessor.cpp
index 1999bcfa165..024acd0efe7 100644
--- a/cloud/src/recycler/hdfs_accessor.cpp
+++ b/cloud/src/recycler/hdfs_accessor.cpp
@@ -356,6 +356,7 @@ std::string extract_parent_path(const std::string& path) {
}
int HdfsAccessor::init() {
+ TEST_SYNC_POINT_RETURN_WITH_VALUE("HdfsAccessor::init.hdfs_init_failed",
(int)-1);
// TODO(plat1ko): Cache hdfsFS
fs_ = HDFSBuilder::create_fs(info_.build_conf());
if (!fs_) {
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index c7c9d8e0a02..d8bbfa15fc0 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -1464,7 +1464,8 @@ int InstanceRecycler::delete_rowset_data(const
doris::RowsetMetaCloudPB& rs_meta
return accessor->delete_files(file_paths);
}
-int InstanceRecycler::delete_rowset_data(const
std::vector<doris::RowsetMetaCloudPB>& rowsets) {
+int InstanceRecycler::delete_rowset_data(const
std::vector<doris::RowsetMetaCloudPB>& rowsets,
+ RowsetRecyclingState type) {
int ret = 0;
// resource_id -> file_paths
std::map<std::string, std::vector<std::string>> resource_file_paths;
@@ -1472,7 +1473,9 @@ int InstanceRecycler::delete_rowset_data(const
std::vector<doris::RowsetMetaClou
std::vector<std::tuple<std::string, int64_t, std::string>>
rowsets_delete_by_prefix;
for (const auto& rs : rowsets) {
- {
+ // we have to treat tmp rowset as "orphans" that may not related to
any existing tablets
+ // due to aborted schema change.
+ if (type == RowsetRecyclingState::FORMAL_ROWSET) {
std::lock_guard lock(recycled_tablets_mtx_);
if (recycled_tablets_.count(rs.tablet_id())) {
continue; // Rowset data has already been deleted
@@ -1499,7 +1502,7 @@ int InstanceRecycler::delete_rowset_data(const
std::vector<doris::RowsetMetaClou
std::vector<std::pair<int64_t, std::string>> index_ids;
// default format as v1.
InvertedIndexStorageFormatPB index_format =
InvertedIndexStorageFormatPB::V1;
-
+ int inverted_index_get_ret = 0;
if (rs.has_tablet_schema()) {
for (const auto& index : rs.tablet_schema().index()) {
if (index.has_index_type() && index.index_type() ==
IndexType::INVERTED) {
@@ -1519,12 +1522,12 @@ int InstanceRecycler::delete_rowset_data(const
std::vector<doris::RowsetMetaClou
continue;
}
InvertedIndexInfo index_info;
- int get_ret =
+ inverted_index_get_ret =
inverted_index_id_cache_->get(rs.index_id(),
rs.schema_version(), index_info);
- if (get_ret == 0) {
+ if (inverted_index_get_ret == 0) {
index_format = index_info.first;
index_ids = index_info.second;
- } else if (get_ret == 1) {
+ } else if (inverted_index_get_ret == 1) {
// 1. Schema kv not found means tablet has been recycled
// Maybe some tablet recycle failed by some bugs
// We need to delete again to double check
@@ -1562,7 +1565,10 @@ int InstanceRecycler::delete_rowset_data(const
std::vector<doris::RowsetMetaClou
file_paths.push_back(inverted_index_path_v1(tablet_id,
rowset_id, i,
index_id.first, index_id.second));
}
- } else if (!index_ids.empty()) {
+ } else if (!index_ids.empty() || inverted_index_get_ret == 1) {
+ // try to recycle inverted index v2 when get_ret == 1
+ // we treat schema not found as if it has a v2 format inverted
index
+ // to reduce chance of data leakage
file_paths.push_back(inverted_index_path_v2(tablet_id,
rowset_id, i));
}
}
@@ -2028,7 +2034,7 @@ int InstanceRecycler::recycle_rowsets() {
rowsets_to_delete.swap(rowsets);
worker_pool->submit([&, rowset_keys_to_delete =
std::move(rowset_keys_to_delete),
rowsets_to_delete =
std::move(rowsets_to_delete)]() {
- if (delete_rowset_data(rowsets_to_delete) != 0) {
+ if (delete_rowset_data(rowsets_to_delete,
RowsetRecyclingState::FORMAL_ROWSET) != 0) {
LOG(WARNING) << "failed to delete rowset data, instance_id="
<< instance_id_;
return;
}
@@ -2225,7 +2231,7 @@ int InstanceRecycler::recycle_tmp_rowsets() {
tmp_rowset_keys.clear();
tmp_rowsets.clear();
});
- if (delete_rowset_data(tmp_rowsets) != 0) {
+ if (delete_rowset_data(tmp_rowsets, RowsetRecyclingState::TMP_ROWSET)
!= 0) {
LOG(WARNING) << "failed to delete tmp rowset data, instance_id="
<< instance_id_;
return -1;
}
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index cf23dcacd2f..84e4075e61b 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -110,6 +110,11 @@ private:
std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
};
+enum class RowsetRecyclingState {
+ FORMAL_ROWSET,
+ TMP_ROWSET,
+};
+
class InstanceRecycler {
public:
explicit InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const
InstanceInfoPB& instance,
@@ -222,7 +227,8 @@ private:
const std::string& rowset_id);
// return 0 for success otherwise error
- int delete_rowset_data(const std::vector<doris::RowsetMetaCloudPB>&
rowsets);
+ int delete_rowset_data(const std::vector<doris::RowsetMetaCloudPB>&
rowsets,
+ RowsetRecyclingState type);
/**
* Get stage storage info from instance and init StorageVaultAccessor
diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp
index 567d27f5d6f..f47c50023b8 100644
--- a/cloud/test/recycler_test.cpp
+++ b/cloud/test/recycler_test.cpp
@@ -1129,6 +1129,7 @@ TEST(RecyclerTest, recycle_indexes) {
j & 1);
auto tmp_rowset = create_rowset("recycle_tmp_rowsets", tablet_id,
index_id, 5,
schemas[j % 5], txn_id_base + j);
+ tmp_rowset.set_resource_id("recycle_indexes");
create_tmp_rowset(txn_kv.get(), accessor.get(), tmp_rowset, j & 1);
}
for (int j = 0; j < 10; ++j) {
@@ -3132,7 +3133,7 @@ TEST(RecyclerTest, delete_rowset_data) {
rowset_pbs.emplace_back(std::move(rowset));
}
- ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs));
+ ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs,
RowsetRecyclingState::FORMAL_ROWSET));
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_all(&list_iter));
ASSERT_FALSE(list_iter->has_next());
@@ -3237,7 +3238,7 @@ TEST(RecyclerTest,
delete_rowset_data_without_inverted_index_storage_format) {
rowset_pbs.emplace_back(std::move(rowset));
}
- ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs));
+ ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs,
RowsetRecyclingState::FORMAL_ROWSET));
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_all(&list_iter));
ASSERT_FALSE(list_iter->has_next());
@@ -3352,6 +3353,11 @@ TEST(RecyclerTest, init_vault_accessor_failed_test) {
rs = resp->add_rowset_meta();
rs->set_resource_id("success_vault");
});
+ sp->set_call_back("HdfsAccessor::init.hdfs_init_failed", [](auto&& args) {
+ auto* ret = try_any_cast_ret<int>(args);
+ ret->first = -1;
+ ret->second = true;
+ });
sp->enable_processing();
// succeed to init MockAccessor
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]