This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 45bcd1d1a3c branch-3.0: [Enhancement](Test) Add test config to
recycler #44761 (#45368)
45bcd1d1a3c is described below
commit 45bcd1d1a3c4255768cf2608d8ba216b5de020cc
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Dec 13 23:33:55 2024 +0800
branch-3.0: [Enhancement](Test) Add test config to recycler #44761 (#45368)
Cherry-picked from #44761
Co-authored-by: abmdocrt <[email protected]>
---
cloud/src/common/config.h | 4 ++++
cloud/src/recycler/meta_checker.cpp | 36 ++++++++++++++++++++++++++++----
cloud/src/recycler/recycler.cpp | 41 ++++++++++++++++++++++++++-----------
3 files changed, 65 insertions(+), 16 deletions(-)
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index c6b6e1ef290..1579ef9d627 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -87,6 +87,10 @@ CONF_mInt64(recycler_sleep_before_scheduling_seconds, "60");
// log a warning if a recycle task takes longer than this duration
CONF_mInt64(recycle_task_threshold_seconds, "10800"); // 3h
+// force recycler to recycle all useless object.
+// **just for TEST**
+CONF_Bool(force_immediate_recycle, "false");
+
CONF_String(test_s3_ak, "");
CONF_String(test_s3_sk, "");
CONF_String(test_s3_endpoint, "");
diff --git a/cloud/src/recycler/meta_checker.cpp
b/cloud/src/recycler/meta_checker.cpp
index 522015555de..f1223068d4b 100644
--- a/cloud/src/recycler/meta_checker.cpp
+++ b/cloud/src/recycler/meta_checker.cpp
@@ -25,6 +25,7 @@
#include <chrono>
#include <set>
+#include "common/logging.h"
#include "common/util.h"
#include "meta-service/keys.h"
#include "meta-service/txn_kv.h"
@@ -54,6 +55,7 @@ struct PartitionInfo {
int64_t db_id;
int64_t table_id;
int64_t partition_id;
+ int64_t tablet_id;
int64_t visible_version;
};
@@ -173,6 +175,9 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
MYSQL_ROW row = mysql_fetch_row(result);
TabletInfo tablet_info = {0};
tablet_info.tablet_id = atoll(row[0]);
+ VLOG_DEBUG << "get tablet info log"
+ << ", db name" << elem.first << ", table name"
<< table
+ << ",tablet id" << tablet_info.tablet_id;
tablet_info.schema_version = atoll(row[4]);
tablets.push_back(std::move(tablet_info));
}
@@ -201,6 +206,13 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
partition_info.db_id = atoll(row[4]);
partition_info.table_id = atoll(row[5]);
partition_info.partition_id = atoll(row[6]);
+ partition_info.tablet_id = tablet_info.tablet_id;
+ VLOG_DEBUG << "get partition info log"
+ << ", db id" << partition_info.db_id << ", table id"
+ << partition_info.table_id << ", partition id"
+ << partition_info.partition_id << ", tablet id"
+ << partition_info.tablet_id;
+
partitions.insert({partition_info.partition_id,
std::move(partition_info)});
}
}
@@ -217,9 +229,16 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
int num_row = mysql_num_rows(result);
for (int i = 0; i < num_row; ++i) {
MYSQL_ROW row = mysql_fetch_row(result);
- int partition_id = atoll(row[0]);
- int visible_version = atoll(row[2]);
+ int64_t partition_id = atoll(row[0]);
+ int64_t visible_version = atoll(row[2]);
partitions[partition_id].visible_version = visible_version;
+ VLOG_DEBUG << "get partition version log"
+ << ", db name" << elem.first << ", table name"
<< table
+ << ", raw partition id" << row[0] << ", first
partition id"
+ << partition_id << ", db id" <<
partitions[partition_id].db_id
+ << ", table id" <<
partitions[partition_id].table_id
+ << ", second partition id" <<
partitions[partition_id].partition_id
+ << ", tablet id" <<
partitions[partition_id].tablet_id;
}
}
mysql_free_result(result);
@@ -354,14 +373,23 @@ bool MetaChecker::check_fdb_by_fe_meta(MYSQL* conn) {
int64_t db_id = elem.second.db_id;
int64_t table_id = elem.second.table_id;
int64_t partition_id = elem.second.partition_id;
+ int64_t tablet_id = elem.second.tablet_id;
std::string ver_key = partition_version_key({instance_id_, db_id,
table_id, partition_id});
std::string ver_val;
err = txn->get(ver_key, &ver_val);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
- LOG(WARNING) << "version key not found, partition id: " <<
partition_id;
+ LOG_WARNING("version key not found.")
+ .tag("db id", db_id)
+ .tag("table id", table_id)
+ .tag("partition id", partition_id)
+ .tag("tablet id", tablet_id);
return false;
} else if (err != TxnErrorCode::TXN_OK) {
- LOG(WARNING) << "failed to get version: " << partition_id;
+ LOG_WARNING("failed to get version.")
+ .tag("db id", db_id)
+ .tag("table id", table_id)
+ .tag("partition id", partition_id)
+ .tag("tablet id", tablet_id);
return false;
}
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index f7000ea3792..6877d7e433b 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -24,6 +24,7 @@
#include <atomic>
#include <chrono>
+#include <cstdint>
#include <deque>
#include <string>
#include <string_view>
@@ -747,7 +748,10 @@ int InstanceRecycler::recycle_indexes() {
.tag("num_recycled", num_recycled);
});
- auto calc_expiration = [](const RecycleIndexPB& index) {
+ auto calc_expiration = [](const RecycleIndexPB& index) -> int64_t {
+ if (config::force_immediate_recycle) {
+ return 0;
+ }
int64_t expiration = index.expiration() > 0 ? index.expiration() :
index.creation_time();
int64_t retention_seconds = config::retention_seconds;
if (index.state() == RecycleIndexPB::DROPPED) {
@@ -942,7 +946,10 @@ int InstanceRecycler::recycle_partitions() {
.tag("num_recycled", num_recycled);
});
- auto calc_expiration = [](const RecyclePartitionPB& partition) {
+ auto calc_expiration = [](const RecyclePartitionPB& partition) -> int64_t {
+ if (config::force_immediate_recycle) {
+ return 0;
+ }
int64_t expiration =
partition.expiration() > 0 ? partition.expiration() :
partition.creation_time();
int64_t retention_seconds = config::retention_seconds;
@@ -1686,7 +1693,10 @@ int InstanceRecycler::recycle_rowsets() {
return 0;
};
- auto calc_expiration = [](const RecycleRowsetPB& rs) {
+ auto calc_expiration = [](const RecycleRowsetPB& rs) -> int64_t {
+ if (config::force_immediate_recycle) {
+ return 0;
+ }
// RecycleRowsetPB created by compacted or dropped rowset has no
expiration time, and will be recycled when exceed retention time
int64_t expiration = rs.expiration() > 0 ? rs.expiration() :
rs.creation_time();
int64_t retention_seconds = config::retention_seconds;
@@ -1923,8 +1933,9 @@ int InstanceRecycler::recycle_tmp_rowsets() {
// ATTN: `txn_expiration` should > 0, however we use `creation_time` +
a large `retention_time` (> 1 day in production environment)
// when `txn_expiration` <= 0 in some unexpected situation (usually
when there are bugs). This is usually safe, coz loading
// duration or timeout always < `retention_time` in practice.
- int64_t expiration =
- rowset.txn_expiration() > 0 ? rowset.txn_expiration() :
rowset.creation_time();
+ int64_t expiration = config::force_immediate_recycle ? 0
+ : rowset.txn_expiration() > 0 ?
rowset.txn_expiration()
+ :
rowset.creation_time();
VLOG_DEBUG << "recycle tmp rowset scan, key=" << hex(k) << "
num_scanned=" << num_scanned
<< " num_expired=" << num_expired << " expiration=" <<
expiration
<< " txn_expiration=" << rowset.txn_expiration()
@@ -2106,7 +2117,7 @@ int InstanceRecycler::abort_timeout_txn() {
LOG_WARNING("malformed txn_running_pb").tag("key", hex(k));
return -1;
}
- if (txn_running_pb.timeout_time() > current_time) {
+ if (!config::force_immediate_recycle &&
txn_running_pb.timeout_time() > current_time) {
return 0;
}
++num_timeout;
@@ -2196,7 +2207,8 @@ int InstanceRecycler::recycle_expired_txn_label() {
LOG_WARNING("malformed txn_running_pb").tag("key", hex(k));
return -1;
}
- if ((recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
+ if ((config::force_immediate_recycle) ||
+ (recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
(recycle_txn_pb.creation_time() + config::label_keep_max_second *
1000L <=
current_time)) {
LOG_INFO("found recycle txn").tag("key", hex(k));
@@ -2492,14 +2504,16 @@ int InstanceRecycler::recycle_copy_jobs() {
int64_t current_time =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
if (copy_job.finish_time_ms() > 0) {
- if (current_time <
- copy_job.finish_time_ms() +
config::copy_job_max_retention_second * 1000) {
+ if (!config::force_immediate_recycle &&
+ current_time < copy_job.finish_time_ms() +
+
config::copy_job_max_retention_second * 1000) {
return 0;
}
} else {
// For compatibility, copy job does not contain finish
time before 2.2.2, use start time
- if (current_time <
- copy_job.start_time_ms() +
config::copy_job_max_retention_second * 1000) {
+ if (!config::force_immediate_recycle &&
+ current_time < copy_job.start_time_ms() +
+
config::copy_job_max_retention_second * 1000) {
return 0;
}
}
@@ -2508,7 +2522,7 @@ int InstanceRecycler::recycle_copy_jobs() {
int64_t current_time =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
// if copy job is timeout: delete all copy file kvs and copy job kv
- if (current_time <= copy_job.timeout_time_ms()) {
+ if (!config::force_immediate_recycle && current_time <=
copy_job.timeout_time_ms()) {
return 0;
}
++num_expired;
@@ -2796,6 +2810,9 @@ int InstanceRecycler::recycle_expired_stage_objects() {
int64_t expiration_time =
duration_cast<seconds>(system_clock::now().time_since_epoch()).count() -
config::internal_stage_objects_expire_time_second;
+ if (config::force_immediate_recycle) {
+ expiration_time = INT64_MAX;
+ }
ret1 = accessor->delete_all(expiration_time);
if (ret1 != 0) {
LOG(WARNING) << "failed to recycle expired stage objects,
instance_id=" << instance_id_
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]