This is an automated email from the ASF dual-hosted git repository. wangdan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pegasus.git
commit cbfc0e84288a92acb998185e43dd3345faaa0078 Author: Dan Wang <[email protected]> AuthorDate: Thu May 25 15:00:26 2023 +0800 feat(new_metrics): migrate metrics for task queue (#1484) https://github.com/apache/incubator-pegasus/issues/1483 All of the 3 metrics of task queue are migrated to the new framework, including the length of task queue, the accumulative number of delayed/rejected tasks by throttling before enqueue. All of these metrics are attached to the queue-level entity, which is also newly introduced in this PR. Furthermore, this PR solves some other problems: - The circular including of header files between `common/json_helper.h` and `utils/metrics.h` leads to compilation error, resolved by dropping a redundant header file `runtime/tool_api.h` in the including chains. - Undefined reference of `http_service::register_handler` while compiling on macOs, is resolved by linking `dsn_http` to static library of `dsn_utils`. - Github actions run out of disk space while building releases, resolved by removing directories and tarballs after packing server and tools to spare more space, which is also tracked by another issue: https://github.com/apache/incubator-pegasus/issues/1485 - Add some hint messages for errors while getting entity pointers. --- src/base/test/CMakeLists.txt | 8 +-- src/common/fs_manager.cpp | 11 ++-- src/common/fs_manager.h | 2 + src/common/serialization_helper/thrift_helper.h | 1 - src/meta/meta_backup_service.cpp | 8 ++- src/meta/meta_backup_service.h | 1 + src/meta/table_metrics.cpp | 13 ++-- src/replica/replica_base.h | 7 ++- src/runtime/task/task_queue.cpp | 80 ++++++++++++++++--------- src/runtime/task/task_queue.h | 19 +++--- src/utils/CMakeLists.txt | 1 + src/utils/metrics.h | 12 +++- 12 files changed, 105 insertions(+), 58 deletions(-) diff --git a/src/base/test/CMakeLists.txt b/src/base/test/CMakeLists.txt index 651e0c055..b7e1c29d0 100644 --- a/src/base/test/CMakeLists.txt +++ b/src/base/test/CMakeLists.txt @@ -28,10 +28,10 @@ set(MY_PROJ_SRC "") set(MY_SRC_SEARCH_MODE "GLOB") set(MY_PROJ_LIBS - dsn_runtime - dsn_utils - pegasus_base - gtest) + dsn_runtime + dsn_utils + pegasus_base + gtest) set(MY_BOOST_LIBS Boost::system Boost::filesystem) diff --git a/src/common/fs_manager.cpp b/src/common/fs_manager.cpp index 1c01486a9..6acb50b11 100644 --- a/src/common/fs_manager.cpp +++ b/src/common/fs_manager.cpp @@ -100,7 +100,9 @@ metric_entity_ptr instantiate_disk_metric_entity(const std::string &tag, } // anonymous namespace disk_capacity_metrics::disk_capacity_metrics(const std::string &tag, const std::string &data_dir) - : _disk_metric_entity(instantiate_disk_metric_entity(tag, data_dir)), + : _tag(tag), + _data_dir(data_dir), + _disk_metric_entity(instantiate_disk_metric_entity(tag, data_dir)), METRIC_VAR_INIT_disk(disk_capacity_total_mb), METRIC_VAR_INIT_disk(disk_capacity_avail_mb) { @@ -109,9 +111,10 @@ disk_capacity_metrics::disk_capacity_metrics(const std::string &tag, const std:: const metric_entity_ptr &disk_capacity_metrics::disk_metric_entity() const { CHECK_NOTNULL(_disk_metric_entity, - "disk metric entity should has been instantiated: " - "uninitialized entity cannot be used to instantiate " - "metric"); + "disk metric entity (tag={}, data_dir={}) should has been instantiated: " + "uninitialized entity cannot be used to instantiate metric", + _tag, + _data_dir); return _disk_metric_entity; } diff --git a/src/common/fs_manager.h b/src/common/fs_manager.h index 2791fe66b..f6edb75ed 100644 --- a/src/common/fs_manager.h +++ b/src/common/fs_manager.h @@ -58,6 +58,8 @@ public: METRIC_DEFINE_SET(disk_capacity_avail_mb, int64_t) private: + const std::string _tag; + const std::string _data_dir; const metric_entity_ptr _disk_metric_entity; METRIC_VAR_DECLARE_gauge_int64(disk_capacity_total_mb); METRIC_VAR_DECLARE_gauge_int64(disk_capacity_avail_mb); diff --git a/src/common/serialization_helper/thrift_helper.h b/src/common/serialization_helper/thrift_helper.h index 83d32ac93..368a2bb27 100644 --- a/src/common/serialization_helper/thrift_helper.h +++ b/src/common/serialization_helper/thrift_helper.h @@ -26,7 +26,6 @@ #pragma once -#include "runtime/tool_api.h" #include "runtime/rpc/rpc_host_port.h" #include "runtime/rpc/rpc_stream.h" diff --git a/src/meta/meta_backup_service.cpp b/src/meta/meta_backup_service.cpp index 98cdf1e77..e1d9f677a 100644 --- a/src/meta/meta_backup_service.cpp +++ b/src/meta/meta_backup_service.cpp @@ -78,7 +78,8 @@ metric_entity_ptr instantiate_backup_policy_metric_entity(const std::string &pol } // anonymous namespace backup_policy_metrics::backup_policy_metrics(const std::string &policy_name) - : _backup_policy_metric_entity(instantiate_backup_policy_metric_entity(policy_name)), + : _policy_name(policy_name), + _backup_policy_metric_entity(instantiate_backup_policy_metric_entity(policy_name)), METRIC_VAR_INIT_backup_policy(backup_recent_duration_ms) { } @@ -86,8 +87,9 @@ backup_policy_metrics::backup_policy_metrics(const std::string &policy_name) const metric_entity_ptr &backup_policy_metrics::backup_policy_metric_entity() const { CHECK_NOTNULL(_backup_policy_metric_entity, - "backup_policy metric entity should has been instantiated: " - "uninitialized entity cannot be used to instantiate metric"); + "backup_policy metric entity (policy_name={}) should has been instantiated: " + "uninitialized entity cannot be used to instantiate metric", + _policy_name); return _backup_policy_metric_entity; } diff --git a/src/meta/meta_backup_service.h b/src/meta/meta_backup_service.h index 280cd8f71..e382a8922 100644 --- a/src/meta/meta_backup_service.h +++ b/src/meta/meta_backup_service.h @@ -189,6 +189,7 @@ public: METRIC_DEFINE_SET(backup_recent_duration_ms, int64_t) private: + const std::string _policy_name; const metric_entity_ptr _backup_policy_metric_entity; METRIC_VAR_DECLARE_gauge_int64(backup_recent_duration_ms); diff --git a/src/meta/table_metrics.cpp b/src/meta/table_metrics.cpp index c63dd1f09..73b45d69e 100644 --- a/src/meta/table_metrics.cpp +++ b/src/meta/table_metrics.cpp @@ -146,9 +146,10 @@ partition_metrics::partition_metrics(int32_t table_id, int32_t partition_id) const metric_entity_ptr &partition_metrics::partition_metric_entity() const { CHECK_NOTNULL(_partition_metric_entity, - "partition metric entity should has been instantiated: " - "uninitialized entity cannot be used to instantiate " - "metric"); + "partition metric entity (table_id={}, partition_id={}) should has been " + "instantiated: uninitialized entity cannot be used to instantiate metric", + _table_id, + _partition_id); return _partition_metric_entity; } @@ -191,9 +192,9 @@ table_metrics::table_metrics(int32_t table_id, int32_t partition_count) const metric_entity_ptr &table_metrics::table_metric_entity() const { CHECK_NOTNULL(_table_metric_entity, - "table metric entity should has been instantiated: " - "uninitialized entity cannot be used to instantiate " - "metric"); + "table metric entity (table_id={}) should has been instantiated: " + "uninitialized entity cannot be used to instantiate metric", + _table_id); return _table_metric_entity; } diff --git a/src/replica/replica_base.h b/src/replica/replica_base.h index e63e09181..8909197a6 100644 --- a/src/replica/replica_base.h +++ b/src/replica/replica_base.h @@ -57,9 +57,10 @@ struct replica_base const metric_entity_ptr &replica_metric_entity() const { CHECK_NOTNULL(_replica_metric_entity, - "replica metric entity should has been instantiated: " - "uninitialized entity cannot be used to instantiate " - "metric"); + "replica metric entity (table_id={}, partition_id={}) should has been " + "instantiated: uninitialized entity cannot be used to instantiate metric", + _gpid.get_app_id(), + _gpid.get_partition_index()); return _replica_metric_entity; } diff --git a/src/runtime/task/task_queue.cpp b/src/runtime/task/task_queue.cpp index 452a36886..19de1954c 100644 --- a/src/runtime/task/task_queue.cpp +++ b/src/runtime/task/task_queue.cpp @@ -26,12 +26,10 @@ #include "task_queue.h" -#include <stdio.h> - +#include "fmt/core.h" #include "runtime/rpc/network.h" #include "runtime/rpc/rpc_engine.h" #include "runtime/rpc/rpc_message.h" -#include "runtime/service_engine.h" #include "runtime/task/task.h" #include "runtime/task/task_spec.h" #include "task_engine.h" @@ -39,35 +37,51 @@ #include "utils/error_code.h" #include "utils/exp_delay.h" #include "utils/fmt_logging.h" +#include "utils/string_view.h" #include "utils/threadpool_spec.h" +METRIC_DEFINE_entity(queue); + +METRIC_DEFINE_gauge_int64(queue, + queue_length, + dsn::metric_unit::kTasks, + "The length of task queue"); + +METRIC_DEFINE_counter(queue, + queue_delayed_tasks, + dsn::metric_unit::kTasks, + "The accumulative number of delayed tasks by throttling before enqueue"); + +METRIC_DEFINE_counter(queue, + queue_rejected_tasks, + dsn::metric_unit::kTasks, + "The accumulative number of rejeced tasks by throttling before enqueue"); + namespace dsn { +namespace { + +metric_entity_ptr instantiate_queue_metric_entity(const std::string &queue_name) +{ + auto entity_id = fmt::format("queue_{}", queue_name); + + return METRIC_ENTITY_queue.instantiate(entity_id, {{"queue_name", queue_name}}); +} + +} // anonymous namespace + task_queue::task_queue(task_worker_pool *pool, int index, task_queue *inner_provider) - : _pool(pool), _queue_length(0) + : _pool(pool), + _name(fmt::format("{}.{}", pool->spec().name, index)), + _index(index), + _queue_length(0), + _spec(const_cast<threadpool_spec *>(&pool->spec())), + _virtual_queue_length(0), + _queue_metric_entity(instantiate_queue_metric_entity(_name)), + METRIC_VAR_INIT_queue(queue_length), + METRIC_VAR_INIT_queue(queue_delayed_tasks), + METRIC_VAR_INIT_queue(queue_rejected_tasks) { - char num[30]; - sprintf(num, "%u", index); - _index = index; - _name = pool->spec().name + '.'; - _name.append(num); - _queue_length_counter.init_global_counter(_pool->node()->full_name(), - "engine", - (_name + ".queue.length").c_str(), - COUNTER_TYPE_NUMBER, - "task queue length"); - _delay_task_counter.init_global_counter(_pool->node()->full_name(), - "engine", - (_name + ".queue.delay_task").c_str(), - COUNTER_TYPE_VOLATILE_NUMBER, - "delay count of tasks before enqueue"); - _reject_task_counter.init_global_counter(_pool->node()->full_name(), - "engine", - (_name + ".queue.reject_task").c_str(), - COUNTER_TYPE_VOLATILE_NUMBER, - "reject count of tasks before enqueue"); - _virtual_queue_length = 0; - _spec = (threadpool_spec *)&pool->spec(); } task_queue::~task_queue() = default; @@ -94,7 +108,7 @@ void task_queue::enqueue_internal(task *task) if (delay_ms > 0) { auto rtask = static_cast<rpc_request_task *>(task); if (rtask->get_request()->io_session->delay_recv(delay_ms)) { - _delay_task_counter->increment(); + METRIC_VAR_INCREMENT(queue_delayed_tasks); } } } else { @@ -104,7 +118,7 @@ void task_queue::enqueue_internal(task *task) auto rtask = static_cast<rpc_request_task *>(task); auto resp = rtask->get_request()->create_response(); task::get_current_rpc()->reply(resp, ERR_BUSY); - _reject_task_counter->increment(); + METRIC_VAR_INCREMENT(queue_rejected_tasks); task->release_ref(); // added in task::enqueue(pool) return; } @@ -114,4 +128,14 @@ void task_queue::enqueue_internal(task *task) tls_dsn.last_worker_queue_size = increase_count(); enqueue(task); } + +const metric_entity_ptr &task_queue::queue_metric_entity() const +{ + CHECK_NOTNULL(_queue_metric_entity, + "queue metric entity (queue_name={}) should has been instantiated: " + "uninitialized entity cannot be used to instantiate metric", + _name); + return _queue_metric_entity; +} + } // namespace dsn diff --git a/src/runtime/task/task_queue.h b/src/runtime/task/task_queue.h index c06cd6c50..99036ab18 100644 --- a/src/runtime/task/task_queue.h +++ b/src/runtime/task/task_queue.h @@ -26,12 +26,11 @@ #pragma once -#include <stdint.h> #include <atomic> #include <string> -#include "perf_counter/perf_counter.h" -#include "perf_counter/perf_counter_wrapper.h" +#include "utils/autoref_ptr.h" +#include "utils/metrics.h" namespace dsn { @@ -71,12 +70,12 @@ public: int count() const { return _queue_length.load(std::memory_order_relaxed); } int decrease_count(int count = 1) { - _queue_length_counter->add((int64_t)(-count)); + METRIC_VAR_DECREMENT_BY(queue_length, count); return _queue_length.fetch_sub(count, std::memory_order_relaxed) - count; } int increase_count(int count = 1) { - _queue_length_counter->add(count); + METRIC_VAR_INCREMENT_BY(queue_length, count); return _queue_length.fetch_add(count, std::memory_order_relaxed) + count; } const std::string &get_name() { return _name; } @@ -88,16 +87,20 @@ private: friend class task_worker_pool; void enqueue_internal(task *task); + const metric_entity_ptr &queue_metric_entity() const; + private: task_worker_pool *_pool; std::string _name; int _index; std::atomic<int> _queue_length; - dsn::perf_counter_wrapper _queue_length_counter; - dsn::perf_counter_wrapper _delay_task_counter; - dsn::perf_counter_wrapper _reject_task_counter; threadpool_spec *_spec; volatile int _virtual_queue_length; + + const metric_entity_ptr _queue_metric_entity; + METRIC_VAR_DECLARE_gauge_int64(queue_length); + METRIC_VAR_DECLARE_counter(queue_delayed_tasks); + METRIC_VAR_DECLARE_counter(queue_rejected_tasks); }; /*@}*/ } // namespace dsn diff --git a/src/utils/CMakeLists.txt b/src/utils/CMakeLists.txt index e9129fa51..8c04e3adc 100644 --- a/src/utils/CMakeLists.txt +++ b/src/utils/CMakeLists.txt @@ -38,6 +38,7 @@ set(MY_BINPLACES "") if (APPLE) dsn_add_static_library() + target_link_libraries(${MY_PROJ_NAME} PRIVATE dsn_http) else() dsn_add_shared_library() endif() diff --git a/src/utils/metrics.h b/src/utils/metrics.h index 5f8514fd8..d8503d22a 100644 --- a/src/utils/metrics.h +++ b/src/utils/metrics.h @@ -171,8 +171,9 @@ class error_code; #define METRIC_VAR_INIT_table(name, ...) METRIC_VAR_INIT(name, table, ##__VA_ARGS__) #define METRIC_VAR_INIT_partition(name, ...) METRIC_VAR_INIT(name, partition, ##__VA_ARGS__) #define METRIC_VAR_INIT_backup_policy(name, ...) METRIC_VAR_INIT(name, backup_policy, ##__VA_ARGS__) +#define METRIC_VAR_INIT_queue(name, ...) METRIC_VAR_INIT(name, queue, ##__VA_ARGS__) -// Perform increment-related operations on gauges and counters. +// Perform increment_by() operations on gauges and counters. #define METRIC_VAR_INCREMENT_BY(name, x) \ do { \ const auto v = (x); \ @@ -184,6 +185,15 @@ class error_code; // Perform increment() operations on gauges and counters. #define METRIC_VAR_INCREMENT(name) METRIC_VAR_NAME(name)->increment() +// Perform decrement_by() operations on gauges. +#define METRIC_VAR_DECREMENT_BY(name, x) \ + do { \ + const auto v = (x); \ + if (v != 0) { \ + METRIC_VAR_NAME(name)->decrement_by(v); \ + } \ + } while (0) + // Perform decrement() operations on gauges. #define METRIC_VAR_DECREMENT(name) METRIC_VAR_NAME(name)->decrement() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
