This is an automated email from the ASF dual-hosted git repository. martinzink pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 1181498b81f4c6ac18c19c9eb85773556a553e5e Author: Gabor Gyimesi <gamezb...@gmail.com> AuthorDate: Tue Feb 7 11:13:54 2023 +0100 MINIFICPP-2016 Add session commit time metrics Closes #1477 Signed-off-by: Martin Zink <martinz...@apache.org> --- METRICS.md | 18 +++++----- .../cluster/checkers/PrometheusChecker.py | 3 +- libminifi/include/core/ProcessorMetrics.h | 5 +++ libminifi/src/core/Processor.cpp | 8 +++-- libminifi/src/core/ProcessorMetrics.cpp | 20 +++++++++-- libminifi/test/unit/MetricsTests.cpp | 41 +++++++++++++++++++++- 6 files changed, 81 insertions(+), 14 deletions(-) diff --git a/METRICS.md b/METRICS.md index bf6ed91e4..f6c87dbaf 100644 --- a/METRICS.md +++ b/METRICS.md @@ -182,14 +182,16 @@ Regular expressions can also be used for requesting multiple processor metrics a There are general metrics that are available for all processors. Besides these metrics processors can implement additional metrics that are speicific to that processor. -| Metric name | Labels | Description | -|----------------------------------------|----------------------------------------------|-------------------------------------------------------------------------------------| -| onTrigger_invocations | metric_class, processor_name, processor_uuid | The number of processor onTrigger calls | -| average_onTrigger_runtime_milliseconds | metric_class, processor_name, processor_uuid | The average runtime in milliseconds of the last 10 onTrigger calls of the processor | -| last_onTrigger_runtime_milliseconds | metric_class, processor_name, processor_uuid | The runtime in milliseconds of the last onTrigger call of the processor | -| transferred_flow_files | metric_class, processor_name, processor_uuid | Number of flow files transferred to a relationship | -| transferred_bytes | metric_class, processor_name, processor_uuid | Number of bytes transferred to a relationship | -| transferred_to_\<relationship\> | metric_class, processor_name, processor_uuid | Number of flow files transferred to a specific relationship | +| Metric name | Labels | Description | +|---------------------------------------------|----------------------------------------------|------------------------------------------------------------------------------------------| +| onTrigger_invocations | metric_class, processor_name, processor_uuid | The number of processor onTrigger calls | +| average_onTrigger_runtime_milliseconds | metric_class, processor_name, processor_uuid | The average runtime in milliseconds of the last 10 onTrigger calls of the processor | +| last_onTrigger_runtime_milliseconds | metric_class, processor_name, processor_uuid | The runtime in milliseconds of the last onTrigger call of the processor | +| average_session_commit_runtime_milliseconds | metric_class, processor_name, processor_uuid | The average runtime in milliseconds of the last 10 session commit calls of the processor | +| last_session_commit_runtime_milliseconds | metric_class, processor_name, processor_uuid | The runtime in milliseconds of the last session commit call of the processor | +| transferred_flow_files | metric_class, processor_name, processor_uuid | Number of flow files transferred to a relationship | +| transferred_bytes | metric_class, processor_name, processor_uuid | Number of bytes transferred to a relationship | +| transferred_to_\<relationship\> | metric_class, processor_name, processor_uuid | Number of flow files transferred to a specific relationship | | Label | Description | |----------------|------------------------------------------------------------------------| diff --git a/docker/test/integration/cluster/checkers/PrometheusChecker.py b/docker/test/integration/cluster/checkers/PrometheusChecker.py index 8df4bf565..461e9a5e8 100644 --- a/docker/test/integration/cluster/checkers/PrometheusChecker.py +++ b/docker/test/integration/cluster/checkers/PrometheusChecker.py @@ -55,7 +55,8 @@ class PrometheusChecker: def verify_general_processor_metrics(self, metric_class, processor_name): labels = {'processor_name': processor_name} - return self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 'minifi_last_onTrigger_runtime_milliseconds'], metric_class, labels) and \ + return self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 'minifi_last_onTrigger_runtime_milliseconds', + 'minifi_average_session_commit_runtime_milliseconds', 'minifi_last_session_commit_runtime_milliseconds'], metric_class, labels) and \ self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 'minifi_transferred_flow_files', 'minifi_transferred_to_success', 'minifi_transferred_bytes'], metric_class, labels) def verify_getfile_metrics(self, metric_class, processor_name): diff --git a/libminifi/include/core/ProcessorMetrics.h b/libminifi/include/core/ProcessorMetrics.h index ae1aa90dc..63e1f7879 100644 --- a/libminifi/include/core/ProcessorMetrics.h +++ b/libminifi/include/core/ProcessorMetrics.h @@ -49,6 +49,10 @@ class ProcessorMetrics : public state::response::ResponseNode { std::chrono::milliseconds getLastOnTriggerRuntime() const; void addLastOnTriggerRuntime(std::chrono::milliseconds runtime); + std::chrono::milliseconds getAverageSessionCommitRuntime() const; + std::chrono::milliseconds getLastSessionCommitRuntime() const; + void addLastSessionCommitRuntime(std::chrono::milliseconds runtime); + std::atomic<size_t> iterations{0}; std::atomic<size_t> transferred_flow_files{0}; std::atomic<uint64_t> transferred_bytes{0}; @@ -80,6 +84,7 @@ class ProcessorMetrics : public state::response::ResponseNode { std::unordered_map<std::string, size_t> transferred_relationships_; const Processor& source_processor_; Averager<std::chrono::milliseconds> on_trigger_runtime_averager_; + Averager<std::chrono::milliseconds> session_commit_runtime_averager_; }; } // namespace org::apache::nifi::minifi::core diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index bd4705675..ffc186a96 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -187,10 +187,12 @@ void Processor::onTrigger(ProcessContext *context, ProcessSessionFactory *sessio try { // Call the virtual trigger function - const auto start = std::chrono::steady_clock::now(); + auto start = std::chrono::steady_clock::now(); onTrigger(context, session.get()); metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start)); + start = std::chrono::steady_clock::now(); session->commit(); + metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start)); } catch (const std::exception& exception) { logger_->log_warn("Caught \"%s\" (%s) during Processor::onTrigger of processor: %s (%s)", exception.what(), typeid(exception).name(), getUUIDStr(), getName()); @@ -210,10 +212,12 @@ void Processor::onTrigger(const std::shared_ptr<ProcessContext> &context, const try { // Call the virtual trigger function - const auto start = std::chrono::steady_clock::now(); + auto start = std::chrono::steady_clock::now(); onTrigger(context, session); metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start)); + start = std::chrono::steady_clock::now(); session->commit(); + metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start)); } catch (std::exception &exception) { logger_->log_warn("Caught \"%s\" (%s) during Processor::onTrigger of processor: %s (%s)", exception.what(), typeid(exception).name(), getUUIDStr(), getName()); diff --git a/libminifi/src/core/ProcessorMetrics.cpp b/libminifi/src/core/ProcessorMetrics.cpp index e243a0178..8cd054888 100644 --- a/libminifi/src/core/ProcessorMetrics.cpp +++ b/libminifi/src/core/ProcessorMetrics.cpp @@ -26,7 +26,8 @@ namespace org::apache::nifi::minifi::core { ProcessorMetrics::ProcessorMetrics(const Processor& source_processor) : source_processor_(source_processor), - on_trigger_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT) { + on_trigger_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT), + session_commit_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT) { } std::string ProcessorMetrics::getName() const { @@ -46,6 +47,8 @@ std::vector<state::response::SerializedResponseNode> ProcessorMetrics::serialize {.name = "OnTriggerInvocations", .value = static_cast<uint32_t>(iterations.load())}, {.name = "AverageOnTriggerRunTime", .value = static_cast<uint64_t>(getAverageOnTriggerRuntime().count())}, {.name = "LastOnTriggerRunTime", .value = static_cast<uint64_t>(getLastOnTriggerRuntime().count())}, + {.name = "AverageSessionCommitRunTime", .value = static_cast<uint64_t>(getAverageSessionCommitRuntime().count())}, + {.name = "LastSessionCommitRunTime", .value = static_cast<uint64_t>(getLastSessionCommitRuntime().count())}, {.name = "TransferredFlowFiles", .value = static_cast<uint32_t>(transferred_flow_files.load())}, {.name = "TransferredBytes", .value = transferred_bytes.load()} } @@ -73,6 +76,8 @@ std::vector<state::PublishedMetric> ProcessorMetrics::calculateMetrics() { {"onTrigger_invocations", static_cast<double>(iterations.load()), getCommonLabels()}, {"average_onTrigger_runtime_milliseconds", static_cast<double>(getAverageOnTriggerRuntime().count()), getCommonLabels()}, {"last_onTrigger_runtime_milliseconds", static_cast<double>(getLastOnTriggerRuntime().count()), getCommonLabels()}, + {"average_session_commit_runtime_milliseconds", static_cast<double>(getAverageSessionCommitRuntime().count()), getCommonLabels()}, + {"last_session_commit_runtime_milliseconds", static_cast<double>(getLastSessionCommitRuntime().count()), getCommonLabels()}, {"transferred_flow_files", static_cast<double>(transferred_flow_files.load()), getCommonLabels()}, {"transferred_bytes", static_cast<double>(transferred_bytes.load()), getCommonLabels()} }; @@ -105,14 +110,25 @@ std::chrono::milliseconds ProcessorMetrics::getLastOnTriggerRuntime() const { return on_trigger_runtime_averager_.getLastValue(); } +std::chrono::milliseconds ProcessorMetrics::getAverageSessionCommitRuntime() const { + return session_commit_runtime_averager_.getAverage(); +} + +void ProcessorMetrics::addLastSessionCommitRuntime(std::chrono::milliseconds runtime) { + session_commit_runtime_averager_.addValue(runtime); +} + +std::chrono::milliseconds ProcessorMetrics::getLastSessionCommitRuntime() const { + return session_commit_runtime_averager_.getLastValue(); +} template<typename ValueType> requires Summable<ValueType> && DividableByInteger<ValueType> ValueType ProcessorMetrics::Averager<ValueType>::getAverage() const { + std::lock_guard<std::mutex> lock(average_value_mutex_); if (values_.empty()) { return {}; } - std::lock_guard<std::mutex> lock(average_value_mutex_); return ranges::accumulate(values_, ValueType{}) / values_.size(); } diff --git a/libminifi/test/unit/MetricsTests.cpp b/libminifi/test/unit/MetricsTests.cpp index 20aeb7c6e..a73cecae5 100644 --- a/libminifi/test/unit/MetricsTests.cpp +++ b/libminifi/test/unit/MetricsTests.cpp @@ -209,7 +209,7 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { } } -TEST_CASE("Test ProcessorMetrics", "[ProcessorMetrics]") { +TEST_CASE("Test on trigger runtime processor metrics", "[ProcessorMetrics]") { DummyProcessor dummy_processor("dummy"); minifi::core::ProcessorMetrics metrics(dummy_processor); @@ -248,4 +248,43 @@ TEST_CASE("Test ProcessorMetrics", "[ProcessorMetrics]") { REQUIRE(metrics.getAverageOnTriggerRuntime() == 37ms); } +TEST_CASE("Test commit runtime processor metrics", "[ProcessorMetrics]") { + DummyProcessor dummy_processor("dummy"); + minifi::core::ProcessorMetrics metrics(dummy_processor); + + REQUIRE("DummyProcessorMetrics" == metrics.getName()); + + REQUIRE(metrics.getLastSessionCommitRuntime() == 0ms); + REQUIRE(metrics.getAverageSessionCommitRuntime() == 0ms); + + metrics.addLastSessionCommitRuntime(10ms); + metrics.addLastSessionCommitRuntime(20ms); + metrics.addLastSessionCommitRuntime(30ms); + + REQUIRE(metrics.getLastSessionCommitRuntime() == 30ms); + REQUIRE(metrics.getAverageSessionCommitRuntime() == 20ms); + + for (auto i = 0; i < 7; ++i) { + metrics.addLastSessionCommitRuntime(50ms); + } + REQUIRE(metrics.getAverageSessionCommitRuntime() == 41ms); + REQUIRE(metrics.getLastSessionCommitRuntime() == 50ms); + + for (auto i = 0; i < 3; ++i) { + metrics.addLastSessionCommitRuntime(50ms); + } + REQUIRE(metrics.getAverageSessionCommitRuntime() == 50ms); + REQUIRE(metrics.getLastSessionCommitRuntime() == 50ms); + + for (auto i = 0; i < 10; ++i) { + metrics.addLastSessionCommitRuntime(40ms); + } + REQUIRE(metrics.getAverageSessionCommitRuntime() == 40ms); + REQUIRE(metrics.getLastSessionCommitRuntime() == 40ms); + + metrics.addLastSessionCommitRuntime(10ms); + REQUIRE(metrics.getLastSessionCommitRuntime() == 10ms); + REQUIRE(metrics.getAverageSessionCommitRuntime() == 37ms); +} + } // namespace org::apache::nifi::minifi::test