This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 17ae38b7356 branch-3.0: [opt](recycler) Improve robustness and
observability #45617 (#45856)
17ae38b7356 is described below
commit 17ae38b73565921c5aa7b2ff32bc4b8f2ce3dbda
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Dec 25 09:37:19 2024 +0800
branch-3.0: [opt](recycler) Improve robustness and observability #45617
(#45856)
Cherry-picked from #45617
Co-authored-by: Gavin Chou <[email protected]>
---
cloud/src/common/bvars.cpp | 160 ++++++---------
cloud/src/common/bvars.h | 96 ++++-----
cloud/src/common/config.h | 2 +
cloud/src/meta-service/meta_service.cpp | 1 +
cloud/src/recycler/obj_storage_client.h | 10 +-
cloud/src/recycler/recycler.cpp | 341 +++++++++++++++++++++-----------
cloud/src/recycler/recycler.h | 20 +-
cloud/src/recycler/s3_accessor.cpp | 42 +++-
cloud/src/recycler/s3_obj_client.cpp | 7 +-
9 files changed, 398 insertions(+), 281 deletions(-)
diff --git a/cloud/src/common/bvars.cpp b/cloud/src/common/bvars.cpp
index 746f109ac6d..a0b0a2da9c2 100644
--- a/cloud/src/common/bvars.cpp
+++ b/cloud/src/common/bvars.cpp
@@ -20,6 +20,8 @@
#include <cstdint>
#include <stdexcept>
+// clang-format off
+
// meta-service's bvars
BvarLatencyRecorderWithTag g_bvar_ms_begin_txn("ms", "begin_txn");
BvarLatencyRecorderWithTag g_bvar_ms_precommit_txn("ms", "precommit_txn");
@@ -71,23 +73,27 @@ BvarLatencyRecorderWithTag g_bvar_ms_get_copy_files("ms",
"get_copy_files");
BvarLatencyRecorderWithTag g_bvar_ms_filter_copy_files("ms",
"filter_copy_files");
BvarLatencyRecorderWithTag g_bvar_ms_update_delete_bitmap("ms",
"update_delete_bitmap");
BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap("ms",
"get_delete_bitmap");
-BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock("ms",
-
"get_delete_bitmap_update_lock");
+BvarLatencyRecorderWithTag g_bvar_ms_get_delete_bitmap_update_lock("ms",
"get_delete_bitmap_update_lock");
BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap("ms",
"remove_delete_bitmap");
-BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap_update_lock(
- "ms", "remove_delete_bitmap_update_lock");
+BvarLatencyRecorderWithTag g_bvar_ms_remove_delete_bitmap_update_lock("ms",
"remove_delete_bitmap_update_lock");
BvarLatencyRecorderWithTag g_bvar_ms_get_instance("ms", "get_instance");
BvarLatencyRecorderWithTag g_bvar_ms_get_rl_task_commit_attach("ms",
"get_rl_task_commit_attach");
BvarLatencyRecorderWithTag g_bvar_ms_reset_rl_progress("ms",
"reset_rl_progress");
BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id("ms", "get_txn_id");
-
BvarLatencyRecorderWithTag g_bvar_ms_start_tablet_job("ms",
"start_tablet_job");
BvarLatencyRecorderWithTag g_bvar_ms_finish_tablet_job("ms",
"finish_tablet_job");
BvarLatencyRecorderWithTag g_bvar_ms_get_cluster_status("ms",
"get_cluster_status");
BvarLatencyRecorderWithTag g_bvar_ms_set_cluster_status("ms",
"set_cluster_status");
-
BvarLatencyRecorderWithTag g_bvar_ms_check_kv("ms", "check_kv");
+// recycler's bvars
+// TODO: use mbvar for per instance,
https://github.com/apache/brpc/blob/master/docs/cn/mbvar_c++.md
+BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_index_earlest_ts("recycler",
"recycle_index_earlest_ts");
+BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_partition_earlest_ts("recycler",
"recycle_partition_earlest_ts");
+BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_rowset_earlest_ts("recycler",
"recycle_rowset_earlest_ts");
+BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_tmp_rowset_earlest_ts("recycler",
"recycle_tmp_rowset_earlest_ts");
+BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_expired_txn_label_earlest_ts("recycler",
"recycle_expired_txn_label_earlest_ts");
+
// txn_kv's bvars
bvar::LatencyRecorder g_bvar_txn_kv_get("txn_kv", "get");
bvar::LatencyRecorder g_bvar_txn_kv_range_get("txn_kv", "range_get");
@@ -101,107 +107,65 @@ bvar::LatencyRecorder
g_bvar_txn_kv_range_remove("txn_kv", "range_remove");
bvar::LatencyRecorder g_bvar_txn_kv_get_read_version("txn_kv",
"get_read_version");
bvar::LatencyRecorder g_bvar_txn_kv_get_committed_version("txn_kv",
"get_committed_version");
bvar::LatencyRecorder g_bvar_txn_kv_batch_get("txn_kv", "batch_get");
-
bvar::Adder<int64_t> g_bvar_txn_kv_get_count_normalized("txn_kv",
"get_count_normalized");
-
bvar::Adder<int64_t> g_bvar_txn_kv_commit_error_counter;
-bvar::Window<bvar::Adder<int64_t> > g_bvar_txn_kv_commit_error_counter_minute(
- "txn_kv", "commit_error", &g_bvar_txn_kv_commit_error_counter, 60);
-
+bvar::Window<bvar::Adder<int64_t> >
g_bvar_txn_kv_commit_error_counter_minute("txn_kv", "commit_error",
&g_bvar_txn_kv_commit_error_counter, 60);
bvar::Adder<int64_t> g_bvar_txn_kv_commit_conflict_counter;
-bvar::Window<bvar::Adder<int64_t> >
g_bvar_txn_kv_commit_conflict_counter_minute(
- "txn_kv", "commit_conflict", &g_bvar_txn_kv_commit_conflict_counter,
60);
+bvar::Window<bvar::Adder<int64_t> >
g_bvar_txn_kv_commit_conflict_counter_minute("txn_kv", "commit_conflict",
&g_bvar_txn_kv_commit_conflict_counter, 60);
+// fdb's bvars
const int64_t BVAR_FDB_INVALID_VALUE = -99999999L;
bvar::Status<int64_t> g_bvar_fdb_client_count("fdb_client_count",
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_configuration_coordinators_count(
- "fdb_configuration_coordinators_count", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t>
g_bvar_fdb_configuration_usable_regions("fdb_configuration_usable_regions",
-
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_coordinators_unreachable_count(
- "fdb_coordinators_unreachable_count", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t>
g_bvar_fdb_fault_tolerance_count("fdb_fault_tolerance_count",
- BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_data_average_partition_size_bytes(
- "fdb_data_average_partition_size_bytes", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t>
g_bvar_fdb_data_log_server_space_bytes("fdb_data_log_server_space_bytes",
-
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_data_moving_data_highest_priority(
- "fdb_data_moving_data_highest_priority", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_data_moving_data_in_flight_bytes(
- "fdb_data_moving_data_in_flight_bytes", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_data_moving_data_in_queue_bytes(
- "fdb_data_moving_data_in_queue_bytes", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_data_moving_total_written_bytes(
- "fdb_data_moving_total_written_bytes", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t>
g_bvar_fdb_data_partition_count("fdb_data_partition_count",
- BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_data_storage_server_space_bytes(
- "fdb_data_storage_server_space_bytes", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_data_state_min_replicas_remaining(
- "fdb_data_state_min_replicas_remaining", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t>
g_bvar_fdb_data_total_kv_size_bytes("fdb_data_total_kv_size_bytes",
-
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t>
g_bvar_fdb_data_total_disk_used_bytes("fdb_data_total_disk_used_bytes",
-
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_configuration_coordinators_count("fdb_configuration_coordinators_count",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_configuration_usable_regions("fdb_configuration_usable_regions",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_coordinators_unreachable_count("fdb_coordinators_unreachable_count",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_fault_tolerance_count("fdb_fault_tolerance_count",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_data_average_partition_size_bytes("fdb_data_average_partition_size_bytes",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_data_log_server_space_bytes("fdb_data_log_server_space_bytes",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_data_moving_data_highest_priority("fdb_data_moving_data_highest_priority",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_data_moving_data_in_flight_bytes("fdb_data_moving_data_in_flight_bytes",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_data_moving_data_in_queue_bytes("fdb_data_moving_data_in_queue_bytes",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_data_moving_total_written_bytes("fdb_data_moving_total_written_bytes",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_data_partition_count("fdb_data_partition_count",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_data_storage_server_space_bytes("fdb_data_storage_server_space_bytes",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_data_state_min_replicas_remaining("fdb_data_state_min_replicas_remaining",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_data_total_kv_size_bytes("fdb_data_total_kv_size_bytes",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_data_total_disk_used_bytes("fdb_data_total_disk_used_bytes",
BVAR_FDB_INVALID_VALUE);
bvar::Status<int64_t> g_bvar_fdb_generation("fdb_generation",
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t>
g_bvar_fdb_incompatible_connections("fdb_incompatible_connections",
-
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_latency_probe_transaction_start_ns(
- "fdb_latency_probe_transaction_start_ns", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t>
g_bvar_fdb_latency_probe_commit_ns("fdb_latency_probe_commit_ns",
-
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t>
g_bvar_fdb_latency_probe_read_ns("fdb_latency_probe_read_ns",
- BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_incompatible_connections("fdb_incompatible_connections",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_latency_probe_transaction_start_ns("fdb_latency_probe_transaction_start_ns",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_latency_probe_commit_ns("fdb_latency_probe_commit_ns",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_latency_probe_read_ns("fdb_latency_probe_read_ns",
BVAR_FDB_INVALID_VALUE);
bvar::Status<int64_t> g_bvar_fdb_machines_count("fdb_machines_count",
BVAR_FDB_INVALID_VALUE);
bvar::Status<int64_t> g_bvar_fdb_process_count("fdb_process_count",
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_qos_worst_data_lag_storage_server_ns(
- "fdb_qos_worst_data_lag_storage_server_ns", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_qos_worst_durability_lag_storage_server_ns(
- "fdb_qos_worst_durability_lag_storage_server_ns",
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_qos_worst_log_server_queue_bytes(
- "fdb_qos_worst_log_server_queue_bytes", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_qos_worst_storage_server_queue_bytes(
- "fdb_qos_worst_storage_server_queue_bytes", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t>
g_bvar_fdb_workload_conflict_rate_hz("fdb_workload_conflict_rate_hz",
-
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t>
g_bvar_fdb_workload_location_rate_hz("fdb_workload_location_rate_hz",
-
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t>
g_bvar_fdb_workload_keys_read_hz("fdb_workload_keys_read_hz",
- BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t>
g_bvar_fdb_workload_read_bytes_hz("fdb_workload_read_bytes_hz",
-
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t>
g_bvar_fdb_workload_read_rate_hz("fdb_workload_read_rate_hz",
- BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t>
g_bvar_fdb_workload_write_rate_hz("fdb_workload_write_rate_hz",
-
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t>
g_bvar_fdb_workload_written_bytes_hz("fdb_workload_written_bytes_hz",
-
BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_workload_transactions_started_hz(
- "fdb_workload_transactions_started_hz", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_workload_transactions_committed_hz(
- "fdb_workload_transactions_committed_hz", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_workload_transactions_rejected_hz(
- "fdb_workload_transactions_rejected_hz", BVAR_FDB_INVALID_VALUE);
-bvar::Status<int64_t> g_bvar_fdb_client_thread_busyness_percent(
- "fdb_client_thread_busyness_percent", BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_qos_worst_data_lag_storage_server_ns("fdb_qos_worst_data_lag_storage_server_ns",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_qos_worst_durability_lag_storage_server_ns("fdb_qos_worst_durability_lag_storage_server_ns",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_qos_worst_log_server_queue_bytes("fdb_qos_worst_log_server_queue_bytes",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_qos_worst_storage_server_queue_bytes("fdb_qos_worst_storage_server_queue_bytes",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_workload_conflict_rate_hz("fdb_workload_conflict_rate_hz",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_workload_location_rate_hz("fdb_workload_location_rate_hz",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_workload_keys_read_hz("fdb_workload_keys_read_hz",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_workload_read_bytes_hz("fdb_workload_read_bytes_hz",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_workload_read_rate_hz("fdb_workload_read_rate_hz",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_workload_write_rate_hz("fdb_workload_write_rate_hz",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_workload_written_bytes_hz("fdb_workload_written_bytes_hz",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_workload_transactions_started_hz("fdb_workload_transactions_started_hz",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_workload_transactions_committed_hz("fdb_workload_transactions_committed_hz",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_workload_transactions_rejected_hz("fdb_workload_transactions_rejected_hz",
BVAR_FDB_INVALID_VALUE);
+bvar::Status<int64_t>
g_bvar_fdb_client_thread_busyness_percent("fdb_client_thread_busyness_percent",
BVAR_FDB_INVALID_VALUE);
// checker's bvars
-BvarStatusWithTag<long> g_bvar_checker_num_scanned("checker", "num_scanned");
-BvarStatusWithTag<long> g_bvar_checker_num_scanned_with_segment("checker",
-
"num_scanned_with_segment");
-BvarStatusWithTag<long> g_bvar_checker_num_check_failed("checker",
"num_check_failed");
-BvarStatusWithTag<long> g_bvar_checker_check_cost_s("checker",
"check_cost_seconds");
-BvarStatusWithTag<long> g_bvar_checker_enqueue_cost_s("checker",
"enqueue_cost_seconds");
-BvarStatusWithTag<long> g_bvar_checker_last_success_time_ms("checker",
"last_success_time_ms");
-BvarStatusWithTag<long> g_bvar_checker_instance_volume("checker",
"instance_volume");
-BvarStatusWithTag<long> g_bvar_inverted_checker_num_scanned("checker",
"num_inverted_scanned");
-BvarStatusWithTag<long> g_bvar_inverted_checker_num_check_failed("checker",
-
"num_inverted_check_failed");
+BvarStatusWithTag<int64_t> g_bvar_checker_num_scanned("checker",
"num_scanned");
+BvarStatusWithTag<int64_t> g_bvar_checker_num_scanned_with_segment("checker",
"num_scanned_with_segment");
+BvarStatusWithTag<int64_t> g_bvar_checker_num_check_failed("checker",
"num_check_failed");
+BvarStatusWithTag<int64_t> g_bvar_checker_check_cost_s("checker",
"check_cost_seconds");
+BvarStatusWithTag<int64_t> g_bvar_checker_enqueue_cost_s("checker",
"enqueue_cost_seconds");
+BvarStatusWithTag<int64_t> g_bvar_checker_last_success_time_ms("checker",
"last_success_time_ms");
+BvarStatusWithTag<int64_t> g_bvar_checker_instance_volume("checker",
"instance_volume");
+BvarStatusWithTag<int64_t> g_bvar_inverted_checker_num_scanned("checker",
"num_inverted_scanned");
+BvarStatusWithTag<int64_t> g_bvar_inverted_checker_num_check_failed("checker",
"num_inverted_check_failed");
+BvarStatusWithTag<int64_t>
g_bvar_inverted_checker_leaked_delete_bitmaps("checker",
"leaked_delete_bitmaps");
+BvarStatusWithTag<int64_t>
g_bvar_inverted_checker_abnormal_delete_bitmaps("checker",
"abnormal_delete_bitmaps");
+BvarStatusWithTag<int64_t>
g_bvar_inverted_checker_delete_bitmaps_scanned("checker",
"delete_bitmap_keys_scanned");
-BvarStatusWithTag<int64_t>
g_bvar_inverted_checker_leaked_delete_bitmaps("checker",
-
"leaked_delete_bitmaps");
-BvarStatusWithTag<int64_t> g_bvar_inverted_checker_abnormal_delete_bitmaps(
- "checker", "abnormal_delete_bitmaps");
-BvarStatusWithTag<int64_t> g_bvar_inverted_checker_delete_bitmaps_scanned(
- "checker", "delete_bitmap_keys_scanned");
\ No newline at end of file
+// clang-format on
diff --git a/cloud/src/common/bvars.h b/cloud/src/common/bvars.h
index d0ad2e97957..93340a6c0d2 100644
--- a/cloud/src/common/bvars.h
+++ b/cloud/src/common/bvars.h
@@ -26,34 +26,53 @@
#include <memory>
#include <mutex>
#include <string>
-
-class BvarLatencyRecorderWithTag {
+#include <type_traits>
+
+/**
+ * Manage bvars that with similar names (identical prefix)
+ * ${module}_${name}_${tag}
+ * where `tag` is added automatically when calling `get` or `put`
+ */
+template <typename Bvar, bool is_status = false>
+class BvarWithTag {
public:
- BvarLatencyRecorderWithTag(std::string module, std::string name)
+ BvarWithTag(std::string module, std::string name)
: module_(std::move(module)), name_(std::move(name)) {}
- void put(const std::string& tag, int64_t value) {
- std::shared_ptr<bvar::LatencyRecorder> instance = nullptr;
+ template <typename ValType>
+ requires std::is_integral_v<ValType>
+ void put(const std::string& tag, ValType value) {
+ std::shared_ptr<Bvar> instance = nullptr;
{
std::lock_guard<bthread::Mutex> l(mutex_);
auto it = bvar_map_.find(tag);
if (it == bvar_map_.end()) {
- instance = std::make_shared<bvar::LatencyRecorder>(module_,
name_ + "_" + tag);
+ instance = std::make_shared<Bvar>(module_, name_ + "_" + tag,
ValType());
bvar_map_[tag] = instance;
} else {
instance = it->second;
}
}
- (*instance) << value;
+ // FIXME(gavin): check bvar::Adder and more
+ if constexpr (std::is_same_v<Bvar, bvar::LatencyRecorder>) {
+ (*instance) << value;
+ } else if constexpr (is_status) {
+ instance->set_value(value);
+ } else {
+ // This branch mean to be unreachable, add an assert(false) here to
+ // prevent missing branch match.
+ // Postpone deduction of static_assert by evaluating sizeof(T)
+ static_assert(!sizeof(Bvar), "all types must be matched with if
constexpr");
+ }
}
- std::shared_ptr<bvar::LatencyRecorder> get(const std::string& tag) {
- std::shared_ptr<bvar::LatencyRecorder> instance = nullptr;
+ std::shared_ptr<Bvar> get(const std::string& tag) {
+ std::shared_ptr<Bvar> instance = nullptr;
std::lock_guard<bthread::Mutex> l(mutex_);
auto it = bvar_map_.find(tag);
if (it == bvar_map_.end()) {
- instance = std::make_shared<bvar::LatencyRecorder>(module_, name_
+ "_" + tag);
+ instance = std::make_shared<Bvar>(module_, name_ + "_" + tag);
bvar_map_[tag] = instance;
return instance;
}
@@ -69,54 +88,14 @@ private:
bthread::Mutex mutex_;
std::string module_;
std::string name_;
- std::map<std::string, std::shared_ptr<bvar::LatencyRecorder>> bvar_map_;
+ std::map<std::string, std::shared_ptr<Bvar>> bvar_map_;
};
-template <class T>
-class BvarStatusWithTag {
-public:
- BvarStatusWithTag(std::string module, std::string name)
- : module_(std::move(module)), name_(std::move(name)) {}
-
- void put(const std::string& tag, T value) {
- std::shared_ptr<bvar::Status<T>> instance = nullptr;
- {
- std::lock_guard<bthread::Mutex> l(mutex_);
- auto it = bvar_map_.find(tag);
- if (it == bvar_map_.end()) {
- instance = std::make_shared<bvar::Status<T>>(module_, name_ +
"_" + tag, T());
- bvar_map_[tag] = instance;
- } else {
- instance = it->second;
- }
- }
- (*instance).set_value(value);
- }
-
- std::shared_ptr<bvar::Status<T>> get(const std::string& tag) {
- std::shared_ptr<bvar::Status<T>> instance = nullptr;
- std::lock_guard<bthread::Mutex> l(mutex_);
-
- auto it = bvar_map_.find(tag);
- if (it == bvar_map_.end()) {
- instance = std::make_shared<bvar::Status<T>>(module_, name_ + "_"
+ tag);
- bvar_map_[tag] = instance;
- return instance;
- }
- return it->second;
- }
-
- void remove(const std::string& tag) {
- std::lock_guard<bthread::Mutex> l(mutex_);
- bvar_map_.erase(tag);
- }
+using BvarLatencyRecorderWithTag = BvarWithTag<bvar::LatencyRecorder>;
-private:
- bthread::Mutex mutex_;
- std::string module_;
- std::string name_;
- std::map<std::string, std::shared_ptr<bvar::Status<T>>> bvar_map_;
-};
+template <typename T>
+ requires std::is_integral_v<T>
+using BvarStatusWithTag = BvarWithTag<bvar::Status<T>, true>;
// meta-service's bvars
extern BvarLatencyRecorderWithTag g_bvar_ms_begin_txn;
@@ -182,6 +161,13 @@ extern BvarLatencyRecorderWithTag
g_bvar_ms_reset_rl_progress;
extern BvarLatencyRecorderWithTag g_bvar_ms_get_txn_id;
extern BvarLatencyRecorderWithTag g_bvar_ms_check_kv;
+// recycler's bvars
+extern BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_index_earlest_ts;
+extern BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_partition_earlest_ts;
+extern BvarStatusWithTag<int64_t> g_bvar_recycler_recycle_rowset_earlest_ts;
+extern BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_tmp_rowset_earlest_ts;
+extern BvarStatusWithTag<int64_t>
g_bvar_recycler_recycle_expired_txn_label_earlest_ts;
+
// txn_kv's bvars
extern bvar::LatencyRecorder g_bvar_txn_kv_get;
extern bvar::LatencyRecorder g_bvar_txn_kv_range_get;
diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index d2ee284d595..ec8b1ce9903 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -66,9 +66,11 @@ CONF_mInt64(dropped_partition_retention_seconds, "10800");
// 3h
CONF_Strings(recycle_whitelist, ""); // Comma seprated list
// These instances will not be recycled, only effective when whitelist is
empty.
CONF_Strings(recycle_blacklist, ""); // Comma seprated list
+// IO worker thread pool concurrency: object list, delete
CONF_mInt32(instance_recycler_worker_pool_size, "32");
CONF_Bool(enable_checker, "false");
// The parallelism for parallel recycle operation
+// s3_producer_pool recycle_tablet_pool, delete single object in this pool
CONF_Int32(recycle_pool_parallelism, "40");
// Currently only used for recycler test
CONF_Bool(enable_inverted_check, "false");
diff --git a/cloud/src/meta-service/meta_service.cpp
b/cloud/src/meta-service/meta_service.cpp
index 18a1eefce8f..9af2b785904 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -1183,6 +1183,7 @@ void
MetaServiceImpl::commit_rowset(::google::protobuf::RpcController* controlle
<< ", rowset_id=" << rowset_id
<< ", rowset_meta_bytes=" <<
rowset_meta.ByteSizeLong()
<< ", segment_key_bounds_bytes=" <<
segment_key_bounds_bytes
+ << ", num_segments=" << rowset_meta.num_segments()
<< ", rowset_meta=" << rowset_meta.ShortDebugString();
}
code = cast_as<ErrCategory::COMMIT>(err);
diff --git a/cloud/src/recycler/obj_storage_client.h
b/cloud/src/recycler/obj_storage_client.h
index fc0211820d1..b3d5cd4978e 100644
--- a/cloud/src/recycler/obj_storage_client.h
+++ b/cloud/src/recycler/obj_storage_client.h
@@ -30,9 +30,15 @@ struct ObjectStoragePathRef {
};
struct ObjectStorageResponse {
- ObjectStorageResponse(int r = 0, std::string msg = "") : ret(r),
error_msg(std::move(msg)) {}
+ enum Code : int {
+ UNDEFINED = -1,
+ OK = 0,
+ NOT_FOUND = 1,
+ };
+
+ ObjectStorageResponse(int r = OK, std::string msg = "") : ret(r),
error_msg(std::move(msg)) {}
// clang-format off
- int ret {0}; // To unify the error handle logic with BE, we'd better use
the same error code as BE
+ int ret {OK}; // To unify the error handle logic with BE, we'd better use
the same error code as BE
// clang-format on
std::string error_msg;
};
diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp
index ca22b28e031..84d755958ee 100644
--- a/cloud/src/recycler/recycler.cpp
+++ b/cloud/src/recycler/recycler.cpp
@@ -40,6 +40,7 @@
#ifdef UNIT_TEST
#include "../test/mock_accessor.h"
#endif
+#include "common/bvars.h"
#include "common/config.h"
#include "common/encryption_util.h"
#include "common/logging.h"
@@ -576,11 +577,11 @@ int InstanceRecycler::init() {
template <typename... Func>
auto task_wrapper(Func... funcs) -> std::function<int()> {
return [funcs...]() {
- return [](std::initializer_list<int> numbers) {
+ return [](std::initializer_list<int> ret_vals) {
int i = 0;
- for (int num : numbers) {
- if (num != 0) {
- i = num;
+ for (int ret : ret_vals) {
+ if (ret != 0) {
+ i = ret;
}
}
return i;
@@ -597,11 +598,15 @@ int InstanceRecycler::do_recycle() {
fmt::format("instance id {}",
instance_id_),
[](int r) { return r != 0; });
sync_executor
- .add(task_wrapper(
+ .add(task_wrapper( // dropped table and dropped partition need
to be recycled in series
+ // becase they may both recycle the same
set of tablets
+ // recycle dropped table or idexes(mv, rollup)
[this]() -> int { return
InstanceRecycler::recycle_indexes(); },
- [this]() -> int { return
InstanceRecycler::recycle_partitions(); },
- [this]() -> int { return
InstanceRecycler::recycle_tmp_rowsets(); },
- [this]() -> int { return
InstanceRecycler::recycle_rowsets(); }))
+ // recycle dropped partitions
+ [this]() -> int { return
InstanceRecycler::recycle_partitions(); }))
+ .add(task_wrapper(
+ [this]() -> int { return
InstanceRecycler::recycle_tmp_rowsets(); }))
+ .add(task_wrapper([this]() -> int { return
InstanceRecycler::recycle_rowsets(); }))
.add(task_wrapper(
[this]() { return
InstanceRecycler::abort_timeout_txn(); },
[this]() { return
InstanceRecycler::recycle_expired_txn_label(); }))
@@ -625,6 +630,11 @@ int InstanceRecycler::do_recycle() {
}
}
+/**
+ * 1. delete all remote data
+ * 2. delete all kv
+ * 3. remove instance kv
+ */
int InstanceRecycler::recycle_deleted_instance() {
LOG_INFO("begin to recycle deleted instance").tag("instance_id",
instance_id_);
@@ -638,6 +648,29 @@ int InstanceRecycler::recycle_deleted_instance() {
<< "s, instance_id=" << instance_id_;
});
+ // delete all remote data
+ for (auto& [_, accessor] : accessor_map_) {
+ if (stopped()) {
+ return ret;
+ }
+
+ LOG(INFO) << "begin to delete all objects in " << accessor->uri();
+ int del_ret = accessor->delete_all();
+ if (del_ret == 0) {
+ LOG(INFO) << "successfully delete all objects in " <<
accessor->uri();
+ } else if (del_ret != 1) { // no need to log, because S3Accessor has
logged this error
+ // If `del_ret == 1`, it can be considered that the object data
has been recycled by cloud platform,
+ // so the recycling has been successful.
+ ret = -1;
+ }
+ }
+
+ if (ret != 0) {
+ LOG(WARNING) << "failed to delete all data of deleted instance=" <<
instance_id_;
+ return ret;
+ }
+
+ // delete all kv
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv_->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
@@ -681,22 +714,6 @@ int InstanceRecycler::recycle_deleted_instance() {
ret = -1;
}
- for (auto& [_, accessor] : accessor_map_) {
- if (stopped()) {
- return ret;
- }
-
- LOG(INFO) << "begin to delete all objects in " << accessor->uri();
- int del_ret = accessor->delete_all();
- if (del_ret == 0) {
- LOG(INFO) << "successfully delete all objects in " <<
accessor->uri();
- } else if (del_ret != 1) { // no need to log, because S3Accessor has
logged this error
- // If `del_ret == 1`, it can be considered that the object data
has been recycled by cloud platform,
- // so the recycling has been successful.
- ret = -1;
- }
- }
-
if (ret == 0) {
// remove instance kv
// ATTN: MUST ensure that cloud platform won't regenerate the same
instance id
@@ -721,9 +738,9 @@ int InstanceRecycler::recycle_deleted_instance() {
int InstanceRecycler::recycle_indexes() {
const std::string task_name = "recycle_indexes";
- int num_scanned = 0;
- int num_expired = 0;
- int num_recycled = 0;
+ int64_t num_scanned = 0;
+ int64_t num_expired = 0;
+ int64_t num_recycled = 0;
RecycleIndexKeyInfo index_key_info0 {instance_id_, 0};
RecycleIndexKeyInfo index_key_info1 {instance_id_, INT64_MAX};
@@ -748,9 +765,11 @@ int InstanceRecycler::recycle_indexes() {
.tag("num_recycled", num_recycled);
});
- auto calc_expiration = [](const RecycleIndexPB& index) -> int64_t {
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ auto calc_expiration = [&earlest_ts, this](const RecycleIndexPB& index) {
if (config::force_immediate_recycle) {
- return 0;
+ return 0L;
}
int64_t expiration = index.expiration() > 0 ? index.expiration() :
index.creation_time();
int64_t retention_seconds = config::retention_seconds;
@@ -758,7 +777,12 @@ int InstanceRecycler::recycle_indexes() {
retention_seconds =
std::min(config::dropped_index_retention_seconds,
retention_seconds);
}
- return expiration + retention_seconds;
+ int64_t final_expiration = expiration + retention_seconds;
+ if (earlest_ts > final_expiration) {
+ earlest_ts = final_expiration;
+ g_bvar_recycler_recycle_index_earlest_ts.put(instance_id_,
earlest_ts);
+ }
+ return final_expiration;
};
// Elements in `index_keys` has the same lifetime as `it` in
`scan_and_recycle`
@@ -919,9 +943,9 @@ bool check_lazy_txn_finished(std::shared_ptr<TxnKv> txn_kv,
const std::string in
int InstanceRecycler::recycle_partitions() {
const std::string task_name = "recycle_partitions";
- int num_scanned = 0;
- int num_expired = 0;
- int num_recycled = 0;
+ int64_t num_scanned = 0;
+ int64_t num_expired = 0;
+ int64_t num_recycled = 0;
RecyclePartKeyInfo part_key_info0 {instance_id_, 0};
RecyclePartKeyInfo part_key_info1 {instance_id_, INT64_MAX};
@@ -946,9 +970,11 @@ int InstanceRecycler::recycle_partitions() {
.tag("num_recycled", num_recycled);
});
- auto calc_expiration = [](const RecyclePartitionPB& partition) -> int64_t {
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ auto calc_expiration = [&earlest_ts, this](const RecyclePartitionPB&
partition) {
if (config::force_immediate_recycle) {
- return 0;
+ return 0L;
}
int64_t expiration =
partition.expiration() > 0 ? partition.expiration() :
partition.creation_time();
@@ -957,7 +983,12 @@ int InstanceRecycler::recycle_partitions() {
retention_seconds =
std::min(config::dropped_partition_retention_seconds,
retention_seconds);
}
- return expiration + retention_seconds;
+ int64_t final_expiration = expiration + retention_seconds;
+ if (earlest_ts > final_expiration) {
+ earlest_ts = final_expiration;
+ g_bvar_recycler_recycle_partition_earlest_ts.put(instance_id_,
earlest_ts);
+ }
+ return final_expiration;
};
// Elements in `partition_keys` has the same lifetime as `it` in
`scan_and_recycle`
@@ -1074,8 +1105,8 @@ int InstanceRecycler::recycle_partitions() {
}
int InstanceRecycler::recycle_versions() {
- int num_scanned = 0;
- int num_recycled = 0;
+ int64_t num_scanned = 0;
+ int64_t num_recycled = 0;
LOG_INFO("begin to recycle table and partition
versions").tag("instance_id", instance_id_);
@@ -1152,13 +1183,14 @@ int InstanceRecycler::recycle_versions() {
int InstanceRecycler::recycle_tablets(int64_t table_id, int64_t index_id,
int64_t partition_id,
bool is_empty_tablet) {
- int num_scanned = 0;
- std::atomic_int num_recycled = 0;
+ int64_t num_scanned = 0;
+ std::atomic_long num_recycled = 0;
std::string tablet_key_begin, tablet_key_end;
std::string stats_key_begin, stats_key_end;
std::string job_key_begin, job_key_end;
+ std::string tablet_belongs;
if (partition_id > 0) {
// recycle tablets in a partition belonging to the index
meta_tablet_key({instance_id_, table_id, index_id, partition_id, 0},
&tablet_key_begin);
@@ -1167,6 +1199,7 @@ int InstanceRecycler::recycle_tablets(int64_t table_id,
int64_t index_id, int64_
stats_tablet_key({instance_id_, table_id, index_id, partition_id + 1,
0}, &stats_key_end);
job_tablet_key({instance_id_, table_id, index_id, partition_id, 0},
&job_key_begin);
job_tablet_key({instance_id_, table_id, index_id, partition_id + 1,
0}, &job_key_end);
+ tablet_belongs = "partition";
} else {
// recycle tablets in the index
meta_tablet_key({instance_id_, table_id, index_id, 0, 0},
&tablet_key_begin);
@@ -1175,9 +1208,10 @@ int InstanceRecycler::recycle_tablets(int64_t table_id,
int64_t index_id, int64_
stats_tablet_key({instance_id_, table_id, index_id + 1, 0, 0},
&stats_key_end);
job_tablet_key({instance_id_, table_id, index_id, 0, 0},
&job_key_begin);
job_tablet_key({instance_id_, table_id, index_id + 1, 0, 0},
&job_key_end);
+ tablet_belongs = "index";
}
- LOG_INFO("begin to recycle tablets")
+ LOG_INFO("begin to recycle tablets of the " + tablet_belongs)
.tag("table_id", table_id)
.tag("index_id", index_id)
.tag("partition_id", partition_id);
@@ -1186,7 +1220,7 @@ int InstanceRecycler::recycle_tablets(int64_t table_id,
int64_t index_id, int64_
std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
auto cost = duration<float>(steady_clock::now() - start_time).count();
- LOG_INFO("recycle tablets finished, cost={}s", cost)
+ LOG_INFO("recycle tablets of " + tablet_belongs + " finished,
cost={}s", cost)
.tag("instance_id", instance_id_)
.tag("table_id", table_id)
.tag("index_id", index_id)
@@ -1612,12 +1646,15 @@ int InstanceRecycler::recycle_tablet(int64_t tablet_id)
{
int InstanceRecycler::recycle_rowsets() {
const std::string task_name = "recycle_rowsets";
- int num_scanned = 0;
- int num_expired = 0;
- int num_prepare = 0;
- size_t total_rowset_size = 0;
+ int64_t num_scanned = 0;
+ int64_t num_expired = 0;
+ int64_t num_prepare = 0;
+ int64_t num_compacted = 0;
+ int64_t num_empty_rowset = 0;
+ size_t total_rowset_key_size = 0;
+ size_t total_rowset_value_size = 0;
size_t expired_rowset_size = 0;
- std::atomic_int num_recycled = 0;
+ std::atomic_long num_recycled = 0;
RecycleRowsetKeyInfo recyc_rs_key_info0 {instance_id_, 0, ""};
RecycleRowsetKeyInfo recyc_rs_key_info1 {instance_id_, INT64_MAX, ""};
@@ -1640,8 +1677,11 @@ int InstanceRecycler::recycle_rowsets() {
.tag("num_scanned", num_scanned)
.tag("num_expired", num_expired)
.tag("num_recycled", num_recycled)
- .tag("num_prepare", num_prepare)
- .tag("total_rowset_meta_size", total_rowset_size)
+ .tag("num_recycled.prepare", num_prepare)
+ .tag("num_recycled.compacted", num_compacted)
+ .tag("num_recycled.empty_rowset", num_empty_rowset)
+ .tag("total_rowset_meta_key_size_scanned",
total_rowset_key_size)
+ .tag("total_rowset_meta_value_size_scanned",
total_rowset_value_size)
.tag("expired_rowset_meta_size", expired_rowset_size);
});
@@ -1692,9 +1732,11 @@ int InstanceRecycler::recycle_rowsets() {
return 0;
};
- auto calc_expiration = [](const RecycleRowsetPB& rs) -> int64_t {
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ auto calc_expiration = [&earlest_ts, this](const RecycleRowsetPB& rs) {
if (config::force_immediate_recycle) {
- return 0;
+ return 0L;
}
// RecycleRowsetPB created by compacted or dropped rowset has no
expiration time, and will be recycled when exceed retention time
int64_t expiration = rs.expiration() > 0 ? rs.expiration() :
rs.creation_time();
@@ -1703,12 +1745,18 @@ int InstanceRecycler::recycle_rowsets() {
retention_seconds =
std::min(config::compacted_rowset_retention_seconds,
retention_seconds);
}
- return expiration + retention_seconds;
+ int64_t final_expiration = expiration + retention_seconds;
+ if (earlest_ts > final_expiration) {
+ earlest_ts = final_expiration;
+ g_bvar_recycler_recycle_rowset_earlest_ts.put(instance_id_,
earlest_ts);
+ }
+ return final_expiration;
};
auto handle_rowset_kv = [&](std::string_view k, std::string_view v) -> int
{
++num_scanned;
- total_rowset_size += v.size();
+ total_rowset_key_size += k.size();
+ total_rowset_value_size += v.size();
RecycleRowsetPB rowset;
if (!rowset.ParseFromArray(v.data(), v.size())) {
LOG_WARNING("malformed recycle rowset").tag("key", hex(k));
@@ -1780,9 +1828,12 @@ int InstanceRecycler::recycle_rowsets() {
return -1;
}
} else {
+ num_compacted += rowset.type() == RecycleRowsetPB::COMPACT;
rowset_keys.emplace_back(k);
if (rowset_meta->num_segments() > 0) { // Skip empty rowset
rowsets.push_back(std::move(*rowset_meta));
+ } else {
+ ++num_empty_rowset;
}
}
return 0;
@@ -1823,8 +1874,7 @@ int InstanceRecycler::recycle_rowsets() {
return ret;
}
-bool check_txn_abort(std::shared_ptr<TxnKv> txn_kv, const std::string&
instance_id,
- int64_t txn_id) {
+bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv, const std::string&
instance_id, int64_t txn_id) {
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
if (err != TxnErrorCode::TXN_OK) {
@@ -1883,11 +1933,12 @@ bool check_txn_abort(std::shared_ptr<TxnKv> txn_kv,
const std::string& instance_
int InstanceRecycler::recycle_tmp_rowsets() {
const std::string task_name = "recycle_tmp_rowsets";
- int num_scanned = 0;
- int num_expired = 0;
- int num_recycled = 0;
+ int64_t num_scanned = 0;
+ int64_t num_expired = 0;
+ int64_t num_recycled = 0;
size_t expired_rowset_size = 0;
- size_t total_rowset_size = 0;
+ size_t total_rowset_key_size = 0;
+ size_t total_rowset_value_size = 0;
MetaRowsetTmpKeyInfo tmp_rs_key_info0 {instance_id_, 0, 0};
MetaRowsetTmpKeyInfo tmp_rs_key_info1 {instance_id_, INT64_MAX, 0};
@@ -1910,41 +1961,54 @@ int InstanceRecycler::recycle_tmp_rowsets() {
.tag("num_scanned", num_scanned)
.tag("num_expired", num_expired)
.tag("num_recycled", num_recycled)
- .tag("total_rowset_meta_size", total_rowset_size)
- .tag("expired_rowset_meta_size", expired_rowset_size);
+ .tag("total_rowset_meta_key_size_scanned",
total_rowset_key_size)
+ .tag("total_rowset_meta_value_size_scanned",
total_rowset_value_size)
+ .tag("expired_rowset_meta_size_recycled", expired_rowset_size);
});
// Elements in `tmp_rowset_keys` has the same lifetime as `it`
std::vector<std::string_view> tmp_rowset_keys;
std::vector<doris::RowsetMetaCloudPB> tmp_rowsets;
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+ auto calc_expiration = [&earlest_ts, this](const doris::RowsetMetaCloudPB&
rowset) {
+ // ATTN: `txn_expiration` should > 0, however we use `creation_time` +
a large `retention_time` (> 1 day in production environment)
+ // when `txn_expiration` <= 0 in some unexpected situation (usually
when there are bugs). This is usually safe, coz loading
+ // duration or timeout always < `retention_time` in practice.
+ int64_t expiration =
+ rowset.txn_expiration() > 0 ? rowset.txn_expiration() :
rowset.creation_time();
+ expiration = config::force_immediate_recycle ? 0 : expiration;
+ int64_t final_expiration = expiration + config::retention_seconds;
+ if (earlest_ts > final_expiration) {
+ earlest_ts = final_expiration;
+ g_bvar_recycler_recycle_tmp_rowset_earlest_ts.put(instance_id_,
earlest_ts);
+ }
+ return final_expiration;
+ };
+
auto handle_rowset_kv = [&num_scanned, &num_expired, &tmp_rowset_keys,
&tmp_rowsets,
- &expired_rowset_size, &total_rowset_size,
+ &expired_rowset_size, &total_rowset_key_size,
&total_rowset_value_size,
+ &calc_expiration,
this](std::string_view k, std::string_view v) ->
int {
++num_scanned;
- total_rowset_size += v.size();
+ total_rowset_key_size += k.size();
+ total_rowset_value_size += v.size();
doris::RowsetMetaCloudPB rowset;
if (!rowset.ParseFromArray(v.data(), v.size())) {
LOG_WARNING("malformed rowset meta").tag("key", hex(k));
return -1;
}
- int64_t current_time = ::time(nullptr);
- // ATTN: `txn_expiration` should > 0, however we use `creation_time` +
a large `retention_time` (> 1 day in production environment)
- // when `txn_expiration` <= 0 in some unexpected situation (usually
when there are bugs). This is usually safe, coz loading
- // duration or timeout always < `retention_time` in practice.
- int64_t expiration = config::force_immediate_recycle ? 0
- : rowset.txn_expiration() > 0 ?
rowset.txn_expiration()
- :
rowset.creation_time();
+ int64_t expiration = calc_expiration(rowset);
VLOG_DEBUG << "recycle tmp rowset scan, key=" << hex(k) << "
num_scanned=" << num_scanned
<< " num_expired=" << num_expired << " expiration=" <<
expiration
<< " txn_expiration=" << rowset.txn_expiration()
<< " rowset_creation_time=" << rowset.creation_time();
- if (current_time < expiration + config::retention_seconds) {
- // not expired
+ int64_t current_time = ::time(nullptr);
+ if (current_time < expiration) { // not expired
return 0;
}
- if (!check_txn_abort(txn_kv_, instance_id_, rowset.txn_id())) {
+ if (!is_txn_aborted(txn_kv_, instance_id_, rowset.txn_id())) {
return 0;
}
@@ -1964,7 +2028,9 @@ int InstanceRecycler::recycle_tmp_rowsets() {
<< " tablet_id=" << rowset.tablet_id() << " rowset_id=" <<
rowset.rowset_id_v2()
<< " version=[" << rowset.start_version() << '-' <<
rowset.end_version()
<< "] txn_id=" << rowset.txn_id() << " rowset_meta_size=" <<
v.size()
- << " creation_time" << rowset.creation_time();
+ << " creation_time=" << rowset.creation_time() << "
num_scanned=" << num_scanned
+ << " num_expired=" << num_expired;
+
tmp_rowset_keys.push_back(k);
if (rowset.num_segments() > 0) { // Skip empty rowset
tmp_rowsets.push_back(std::move(rowset));
@@ -1997,31 +2063,57 @@ int InstanceRecycler::scan_and_recycle(
std::string begin, std::string_view end,
std::function<int(std::string_view k, std::string_view v)>
recycle_func,
std::function<int()> loop_done) {
+ LOG(INFO) << "begin scan_and_recycle key_range=[" << hex(begin) << "," <<
hex(end) << ")";
int ret = 0;
+ int64_t cnt = 0;
+ int get_range_retried = 0;
+ std::string err;
+ std::unique_ptr<int, std::function<void(int*)>> defer_log(
+ (int*)0x01, [begin, end, &err, &ret, &cnt,
&get_range_retried](int*) {
+ LOG(INFO) << "finish scan_and_recycle key_range=[" <<
hex(begin) << "," << hex(end)
+ << ") num_scanned=" << cnt << " get_range_retried="
<< get_range_retried
+ << " ret=" << ret << " err=" << err;
+ });
+
std::unique_ptr<RangeGetIterator> it;
do {
- int get_ret = txn_get(txn_kv_.get(), begin, end, it);
- if (get_ret != 0) {
- LOG(WARNING) << "failed to get kv, key=" << begin << " ret=" <<
get_ret;
+ if (get_range_retried > 1000) {
+ err = "txn_get exceeds max retry, may not scan all keys";
+ ret = -1;
return -1;
}
- VLOG_DEBUG << "fetch " << it->size() << " kv";
+ int get_ret = txn_get(txn_kv_.get(), begin, end, it);
+ if (get_ret != 0) { // txn kv may complain "Request for future version"
+ LOG(WARNING) << "failed to get kv, range=[" << hex(begin) << ","
<< hex(end)
+ << ") num_scanned=" << cnt << " txn_get_ret=" <<
get_ret
+ << " get_range_retried=" << get_range_retried;
+ ++get_range_retried;
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ continue; // try again
+ }
if (!it->has_next()) {
- VLOG_DEBUG << "no keys in the given range, begin=" << hex(begin)
<< " end=" << hex(end);
- break;
+ LOG(INFO) << "no keys in the given range=[" << hex(begin) << ","
<< hex(end) << ")";
+ break; // scan finished
}
while (it->has_next()) {
+ ++cnt;
// recycle corresponding resources
auto [k, v] = it->next();
if (!it->has_next()) {
begin = k;
VLOG_DEBUG << "iterator has no more kvs. key=" << hex(k);
}
- if (recycle_func(k, v) != 0) ret = -1;
+ // if we want to continue scanning, the recycle_func should not
return non-zero
+ if (recycle_func(k, v) != 0) {
+ err = "recycle_func error";
+ ret = -1;
+ }
}
begin.push_back('\x00'); // Update to next smallest key for iteration
- if (loop_done) {
- if (loop_done() != 0) ret = -1;
+ // if we want to continue scanning, the recycle_func should not return
non-zero
+ if (loop_done && loop_done() != 0) {
+ err = "loop_done error";
+ ret = -1;
}
} while (it->more() && !stopped());
return ret;
@@ -2029,10 +2121,10 @@ int InstanceRecycler::scan_and_recycle(
int InstanceRecycler::abort_timeout_txn() {
const std::string task_name = "abort_timeout_txn";
- int num_scanned = 0;
- int num_timeout = 0;
- int num_abort = 0;
- int num_advance = 0;
+ int64_t num_scanned = 0;
+ int64_t num_timeout = 0;
+ int64_t num_abort = 0;
+ int64_t num_advance = 0;
TxnRunningKeyInfo txn_running_key_info0 {instance_id_, 0, 0};
TxnRunningKeyInfo txn_running_key_info1 {instance_id_, INT64_MAX,
INT64_MAX};
@@ -2169,9 +2261,9 @@ int InstanceRecycler::abort_timeout_txn() {
int InstanceRecycler::recycle_expired_txn_label() {
const std::string task_name = "recycle_expired_txn_label";
- int num_scanned = 0;
- int num_expired = 0;
- int num_recycled = 0;
+ int64_t num_scanned = 0;
+ int64_t num_expired = 0;
+ int64_t num_recycled = 0;
RecycleTxnKeyInfo recycle_txn_key_info0 {instance_id_, 0, 0};
RecycleTxnKeyInfo recycle_txn_key_info1 {instance_id_, INT64_MAX,
INT64_MAX};
@@ -2195,11 +2287,23 @@ int InstanceRecycler::recycle_expired_txn_label() {
.tag("num_recycled", num_recycled);
});
- int64_t current_time =
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+ auto calc_expiration = [&earlest_ts, this](const RecycleTxnPB&
recycle_txn_pb) {
+ int64_t final_expiration =
+ recycle_txn_pb.creation_time() + config::label_keep_max_second
* 1000L;
+ if (earlest_ts > final_expiration / 1000) {
+ earlest_ts = final_expiration / 1000;
+
g_bvar_recycler_recycle_expired_txn_label_earlest_ts.put(instance_id_,
earlest_ts);
+ }
+ return final_expiration;
+ };
+
+ int64_t current_time_ms =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
- auto handle_recycle_txn_kv = [&num_scanned, &num_expired, &num_recycled,
¤t_time, this](
- std::string_view k, std::string_view
v) -> int {
+ auto handle_recycle_txn_kv = [&num_scanned, &num_expired, &num_recycled,
¤t_time_ms,
+ &calc_expiration,
+ this](std::string_view k, std::string_view
v) -> int {
++num_scanned;
RecycleTxnPB recycle_txn_pb;
if (!recycle_txn_pb.ParseFromArray(v.data(), v.size())) {
@@ -2208,13 +2312,13 @@ int InstanceRecycler::recycle_expired_txn_label() {
}
if ((config::force_immediate_recycle) ||
(recycle_txn_pb.has_immediate() && recycle_txn_pb.immediate()) ||
- (recycle_txn_pb.creation_time() + config::label_keep_max_second *
1000L <=
- current_time)) {
- LOG_INFO("found recycle txn").tag("key", hex(k));
+ (calc_expiration(recycle_txn_pb) <= current_time_ms)) {
+ VLOG_DEBUG << "found recycle txn, key=" << hex(k);
num_expired++;
} else {
return 0;
}
+
std::string_view k1 = k;
//RecycleTxnKeyInfo 0:instance_id 1:db_id 2:txn_id
k1.remove_prefix(1); // Remove key space
@@ -2414,10 +2518,10 @@ private:
};
int InstanceRecycler::recycle_copy_jobs() {
- int num_scanned = 0;
- int num_finished = 0;
- int num_expired = 0;
- int num_recycled = 0;
+ int64_t num_scanned = 0;
+ int64_t num_finished = 0;
+ int64_t num_expired = 0;
+ int64_t num_recycled = 0;
// Used for INTERNAL stage's copy jobs to tag each batch for log trace
uint64_t batch_count = 0;
const std::string task_name = "recycle_copy_jobs";
@@ -2659,8 +2763,8 @@ int InstanceRecycler::init_copy_job_accessor(const
std::string& stage_id,
}
int InstanceRecycler::recycle_stage() {
- int num_scanned = 0;
- int num_recycled = 0;
+ int64_t num_scanned = 0;
+ int64_t num_recycled = 0;
const std::string task_name = "recycle_stage";
LOG_INFO("begin to recycle stage").tag("instance_id", instance_id_);
@@ -2680,12 +2784,9 @@ int InstanceRecycler::recycle_stage() {
RecycleStageKeyInfo key_info0 {instance_id_, ""};
RecycleStageKeyInfo key_info1 {instance_id_, "\xff"};
- std::string key0;
- std::string key1;
- recycle_stage_key(key_info0, &key0);
- recycle_stage_key(key_info1, &key1);
+ std::string key0 = recycle_stage_key(key_info0);
+ std::string key1 = recycle_stage_key(key_info1);
- // Elements in `tmp_rowset_keys` has the same lifetime as `it`
std::vector<std::string_view> stage_keys;
auto recycle_func = [&start_time, &num_scanned, &num_recycled,
&stage_keys, this](
std::string_view k, std::string_view v) -> int
{
@@ -2775,6 +2876,12 @@ int InstanceRecycler::recycle_expired_stage_objects() {
});
int ret = 0;
for (const auto& stage : instance_info_.stages()) {
+ std::stringstream ss;
+ ss << "instance_id=" << instance_id_ << ", stage_id=" <<
stage.stage_id()
+ << ", user_name=" << stage.mysql_user_name().at(0)
+ << ", user_id=" << stage.mysql_user_id().at(0)
+ << ", prefix=" << stage.obj_info().prefix();
+
if (stopped()) break;
if (stage.type() == StagePB::EXTERNAL) {
continue;
@@ -2788,7 +2895,7 @@ int InstanceRecycler::recycle_expired_stage_objects() {
const auto& old_obj = instance_info_.obj_info()[idx - 1];
auto s3_conf = S3Conf::from_obj_store_info(old_obj);
if (!s3_conf) {
- LOG(WARNING) << "failed to init accessor";
+ LOG(WARNING) << "failed to init s3_conf with obj_info=" <<
old_obj.DebugString();
continue;
}
@@ -2796,16 +2903,18 @@ int InstanceRecycler::recycle_expired_stage_objects() {
std::shared_ptr<S3Accessor> accessor;
int ret1 = S3Accessor::create(std::move(*s3_conf), &accessor);
if (ret1 != 0) {
- LOG(WARNING) << "failed to init s3 accessor ret=" << ret1;
+ LOG(WARNING) << "failed to init s3 accessor ret=" << ret1 << " "
<< ss.str();
+ ret = -1;
+ continue;
+ }
+
+ if (s3_conf->prefix.find("/stage/") == std::string::npos) {
+ LOG(WARNING) << "try to delete illegal prefix, which is
catastrophic, " << ss.str();
ret = -1;
continue;
}
- LOG(INFO) << "recycle expired stage objects, instance_id=" <<
instance_id_
- << ", stage_id=" << stage.stage_id()
- << ", user_name=" << stage.mysql_user_name().at(0)
- << ", user_id=" << stage.mysql_user_id().at(0)
- << ", prefix=" << stage.obj_info().prefix();
+ LOG(INFO) << "recycle expired stage objects, " << ss.str();
int64_t expiration_time =
duration_cast<seconds>(system_clock::now().time_since_epoch()).count() -
config::internal_stage_objects_expire_time_second;
@@ -2814,8 +2923,8 @@ int InstanceRecycler::recycle_expired_stage_objects() {
}
ret1 = accessor->delete_all(expiration_time);
if (ret1 != 0) {
- LOG(WARNING) << "failed to recycle expired stage objects,
instance_id=" << instance_id_
- << ", stage_id=" << stage.stage_id() << ", ret=" <<
ret1;
+ LOG(WARNING) << "failed to recycle expired stage objects, ret=" <<
ret1 << " "
+ << ss.str();
ret = -1;
continue;
}
diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h
index 91a461f474f..cf23dcacd2f 100644
--- a/cloud/src/recycler/recycler.h
+++ b/cloud/src/recycler/recycler.h
@@ -55,7 +55,9 @@ struct RecyclerThreadPoolGroup {
RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup& other) =
default;
RecyclerThreadPoolGroup& operator=(RecyclerThreadPoolGroup&& other) =
default;
RecyclerThreadPoolGroup(RecyclerThreadPoolGroup&&) = default;
+ // used for accessor.delete_files, accessor.delete_directory
std::shared_ptr<SimpleThreadPool> s3_producer_pool;
+ // used for InstanceRecycler::recycle_tablet
std::shared_ptr<SimpleThreadPool> recycle_tablet_pool;
std::shared_ptr<SimpleThreadPool> group_recycle_function_pool;
};
@@ -128,19 +130,26 @@ public:
// returns 0 for success otherwise error
int recycle_deleted_instance();
- // scan and recycle expired indexes
+ // scan and recycle expired indexes:
+ // 1. dropped table, dropped mv
+ // 2. half-successtable/index when create
// returns 0 for success otherwise error
int recycle_indexes();
- // scan and recycle expired partitions
+ // scan and recycle expired partitions:
+ // 1. dropped parttion
+ // 2. half-success partition when create
// returns 0 for success otherwise error
int recycle_partitions();
- // scan and recycle expired rowsets
+ // scan and recycle expired rowsets:
+ // 1. prepare_rowset will produce recycle_rowset before uploading data to
remote storage (memo)
+ // 2. compaction will change the input rowsets to recycle_rowset
// returns 0 for success otherwise error
int recycle_rowsets();
- // scan and recycle expired tmp rowsets
+ // scan and recycle expired tmp rowsets:
+ // 1. commit_rowset will produce tmp_rowset when finish upload data (load
or compaction) to remote storage
// returns 0 for success otherwise error
int recycle_tmp_rowsets();
@@ -203,12 +212,15 @@ private:
int scan_and_recycle(std::string begin, std::string_view end,
std::function<int(std::string_view k,
std::string_view v)> recycle_func,
std::function<int()> loop_done = nullptr);
+
// return 0 for success otherwise error
int delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta_pb);
+
// return 0 for success otherwise error
// NOTE: this function ONLY be called when the file paths cannot be
calculated
int delete_rowset_data(const std::string& resource_id, int64_t tablet_id,
const std::string& rowset_id);
+
// return 0 for success otherwise error
int delete_rowset_data(const std::vector<doris::RowsetMetaCloudPB>&
rowsets);
diff --git a/cloud/src/recycler/s3_accessor.cpp
b/cloud/src/recycler/s3_accessor.cpp
index 1aca88d2d11..224b36c277c 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -282,6 +282,11 @@ int S3Accessor::init() {
Aws::Client::ClientConfiguration aws_config;
aws_config.endpointOverride = conf_.endpoint;
aws_config.region = conf_.region;
+ // Aws::Http::CurlHandleContainer::AcquireCurlHandle() may be blocked
if the connecitons are bottleneck
+ aws_config.maxConnections =
std::max((long)(config::recycle_pool_parallelism +
+
config::instance_recycler_worker_pool_size),
+ (long)aws_config.maxConnections);
+
if (config::s3_client_http_scheme == "http") {
aws_config.scheme = Aws::Http::Scheme::HTTP;
}
@@ -349,7 +354,12 @@ int S3Accessor::delete_files(const
std::vector<std::string>& paths) {
int S3Accessor::delete_file(const std::string& path) {
LOG_INFO("delete file").tag("uri", to_uri(path));
- return obj_client_->delete_object({.bucket = conf_.bucket, .key =
get_key(path)}).ret;
+ int ret = obj_client_->delete_object({.bucket = conf_.bucket, .key =
get_key(path)}).ret;
+ static_assert(ObjectStorageResponse::OK == 0);
+ if (ret == ObjectStorageResponse::OK || ret ==
ObjectStorageResponse::NOT_FOUND) {
+ return 0;
+ }
+ return ret;
}
int S3Accessor::put_file(const std::string& path, const std::string& content) {
@@ -392,21 +402,45 @@ int S3Accessor::check_versioning() {
}
int GcsAccessor::delete_prefix_impl(const std::string& path_prefix, int64_t
expiration_time) {
- LOG_INFO("delete prefix").tag("uri", to_uri(path_prefix));
+ LOG_INFO("begin delete prefix").tag("uri", to_uri(path_prefix));
int ret = 0;
+ int cnt = 0;
+ int skip = 0;
+ int64_t del_nonexisted = 0;
+ int del = 0;
auto iter = obj_client_->list_objects({conf_.bucket,
get_key(path_prefix)});
for (auto obj = iter->next(); obj.has_value(); obj = iter->next()) {
+ if (!(++cnt % 100)) {
+ LOG_INFO("loop delete prefix")
+ .tag("uri", to_uri(path_prefix))
+ .tag("total_obj_cnt", cnt)
+ .tag("deleted", del)
+ .tag("del_nonexisted", del_nonexisted)
+ .tag("skipped", skip);
+ }
if (expiration_time > 0 && obj->mtime_s > expiration_time) {
+ skip++;
continue;
}
+ del++;
- // FIXME(plat1ko): Delete objects by batch
- if (int del_ret = obj_client_->delete_object({conf_.bucket,
obj->key}).ret; del_ret != 0) {
+ // FIXME(plat1ko): Delete objects by batch with genuine GCS client
+ int del_ret = obj_client_->delete_object({conf_.bucket, obj->key}).ret;
+ del_nonexisted += (del_ret == ObjectStorageResponse::NOT_FOUND);
+ static_assert(ObjectStorageResponse::OK == 0);
+ if (del_ret != ObjectStorageResponse::OK && del_ret !=
ObjectStorageResponse::NOT_FOUND) {
ret = del_ret;
}
}
+ LOG_INFO("finish delete prefix")
+ .tag("uri", to_uri(path_prefix))
+ .tag("total_obj_cnt", cnt)
+ .tag("deleted", del)
+ .tag("del_nonexisted", del_nonexisted)
+ .tag("skipped", skip);
+
if (!iter->is_valid()) {
return -1;
}
diff --git a/cloud/src/recycler/s3_obj_client.cpp
b/cloud/src/recycler/s3_obj_client.cpp
index c8dcdad18d7..0e548819d25 100644
--- a/cloud/src/recycler/s3_obj_client.cpp
+++ b/cloud/src/recycler/s3_obj_client.cpp
@@ -293,9 +293,12 @@ ObjectStorageResponse
S3ObjClient::delete_object(ObjectStoragePathRef path) {
.tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
.tag("error", outcome.GetError().GetMessage())
.tag("exception", outcome.GetError().GetExceptionName());
- return -1;
+ if (outcome.GetError().GetResponseCode() ==
Aws::Http::HttpResponseCode::NOT_FOUND) {
+ return {ObjectStorageResponse::NOT_FOUND,
outcome.GetError().GetMessage()};
+ }
+ return {ObjectStorageResponse::UNDEFINED,
outcome.GetError().GetMessage()};
}
- return 0;
+ return {ObjectStorageResponse::OK};
}
ObjectStorageResponse
S3ObjClient::delete_objects_recursively(ObjectStoragePathRef path,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]