This is an automated email from the ASF dual-hosted git repository.

martinzink pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 1181498b81f4c6ac18c19c9eb85773556a553e5e
Author: Gabor Gyimesi <gamezb...@gmail.com>
AuthorDate: Tue Feb 7 11:13:54 2023 +0100

    MINIFICPP-2016 Add session commit time metrics
    
    Closes #1477
    
    Signed-off-by: Martin Zink <martinz...@apache.org>
---
 METRICS.md                                         | 18 +++++-----
 .../cluster/checkers/PrometheusChecker.py          |  3 +-
 libminifi/include/core/ProcessorMetrics.h          |  5 +++
 libminifi/src/core/Processor.cpp                   |  8 +++--
 libminifi/src/core/ProcessorMetrics.cpp            | 20 +++++++++--
 libminifi/test/unit/MetricsTests.cpp               | 41 +++++++++++++++++++++-
 6 files changed, 81 insertions(+), 14 deletions(-)

diff --git a/METRICS.md b/METRICS.md
index bf6ed91e4..f6c87dbaf 100644
--- a/METRICS.md
+++ b/METRICS.md
@@ -182,14 +182,16 @@ Regular expressions can also be used for requesting 
multiple processor metrics a
 
 There are general metrics that are available for all processors. Besides these 
metrics processors can implement additional metrics that are speicific to that 
processor.
 
-| Metric name                            | Labels                              
         | Description                                                          
               |
-|----------------------------------------|----------------------------------------------|-------------------------------------------------------------------------------------|
-| onTrigger_invocations                  | metric_class, processor_name, 
processor_uuid | The number of processor onTrigger calls                        
                     |
-| average_onTrigger_runtime_milliseconds | metric_class, processor_name, 
processor_uuid | The average runtime in milliseconds of the last 10 onTrigger 
calls of the processor |
-| last_onTrigger_runtime_milliseconds    | metric_class, processor_name, 
processor_uuid | The runtime in milliseconds of the last onTrigger call of the 
processor             |
-| transferred_flow_files                 | metric_class, processor_name, 
processor_uuid | Number of flow files transferred to a relationship             
                     |
-| transferred_bytes                      | metric_class, processor_name, 
processor_uuid | Number of bytes transferred to a relationship                  
                     |
-| transferred_to_\<relationship\>        | metric_class, processor_name, 
processor_uuid | Number of flow files transferred to a specific relationship    
                     |
+| Metric name                                 | Labels                         
              | Description                                                     
                         |
+|---------------------------------------------|----------------------------------------------|------------------------------------------------------------------------------------------|
+| onTrigger_invocations                       | metric_class, processor_name, 
processor_uuid | The number of processor onTrigger calls                        
                          |
+| average_onTrigger_runtime_milliseconds      | metric_class, processor_name, 
processor_uuid | The average runtime in milliseconds of the last 10 onTrigger 
calls of the processor      |
+| last_onTrigger_runtime_milliseconds         | metric_class, processor_name, 
processor_uuid | The runtime in milliseconds of the last onTrigger call of the 
processor                  |
+| average_session_commit_runtime_milliseconds | metric_class, processor_name, 
processor_uuid | The average runtime in milliseconds of the last 10 session 
commit calls of the processor |
+| last_session_commit_runtime_milliseconds    | metric_class, processor_name, 
processor_uuid | The runtime in milliseconds of the last session commit call of 
the processor             |
+| transferred_flow_files                      | metric_class, processor_name, 
processor_uuid | Number of flow files transferred to a relationship             
                          |
+| transferred_bytes                           | metric_class, processor_name, 
processor_uuid | Number of bytes transferred to a relationship                  
                          |
+| transferred_to_\<relationship\>             | metric_class, processor_name, 
processor_uuid | Number of flow files transferred to a specific relationship    
                          |
 
 | Label          | Description                                                 
           |
 
|----------------|------------------------------------------------------------------------|
diff --git a/docker/test/integration/cluster/checkers/PrometheusChecker.py 
b/docker/test/integration/cluster/checkers/PrometheusChecker.py
index 8df4bf565..461e9a5e8 100644
--- a/docker/test/integration/cluster/checkers/PrometheusChecker.py
+++ b/docker/test/integration/cluster/checkers/PrometheusChecker.py
@@ -55,7 +55,8 @@ class PrometheusChecker:
 
     def verify_general_processor_metrics(self, metric_class, processor_name):
         labels = {'processor_name': processor_name}
-        return 
self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 
'minifi_last_onTrigger_runtime_milliseconds'], metric_class, labels) and \
+        return 
self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 
'minifi_last_onTrigger_runtime_milliseconds',
+                                          
'minifi_average_session_commit_runtime_milliseconds', 
'minifi_last_session_commit_runtime_milliseconds'], metric_class, labels) and \
             
self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 
'minifi_transferred_flow_files', 'minifi_transferred_to_success', 
'minifi_transferred_bytes'], metric_class, labels)
 
     def verify_getfile_metrics(self, metric_class, processor_name):
diff --git a/libminifi/include/core/ProcessorMetrics.h 
b/libminifi/include/core/ProcessorMetrics.h
index ae1aa90dc..63e1f7879 100644
--- a/libminifi/include/core/ProcessorMetrics.h
+++ b/libminifi/include/core/ProcessorMetrics.h
@@ -49,6 +49,10 @@ class ProcessorMetrics : public 
state::response::ResponseNode {
   std::chrono::milliseconds getLastOnTriggerRuntime() const;
   void addLastOnTriggerRuntime(std::chrono::milliseconds runtime);
 
+  std::chrono::milliseconds getAverageSessionCommitRuntime() const;
+  std::chrono::milliseconds getLastSessionCommitRuntime() const;
+  void addLastSessionCommitRuntime(std::chrono::milliseconds runtime);
+
   std::atomic<size_t> iterations{0};
   std::atomic<size_t> transferred_flow_files{0};
   std::atomic<uint64_t> transferred_bytes{0};
@@ -80,6 +84,7 @@ class ProcessorMetrics : public state::response::ResponseNode 
{
   std::unordered_map<std::string, size_t> transferred_relationships_;
   const Processor& source_processor_;
   Averager<std::chrono::milliseconds> on_trigger_runtime_averager_;
+  Averager<std::chrono::milliseconds> session_commit_runtime_averager_;
 };
 
 }  // namespace org::apache::nifi::minifi::core
diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp
index bd4705675..ffc186a96 100644
--- a/libminifi/src/core/Processor.cpp
+++ b/libminifi/src/core/Processor.cpp
@@ -187,10 +187,12 @@ void Processor::onTrigger(ProcessContext *context, 
ProcessSessionFactory *sessio
 
   try {
     // Call the virtual trigger function
-    const auto start = std::chrono::steady_clock::now();
+    auto start = std::chrono::steady_clock::now();
     onTrigger(context, session.get());
     
metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start));
+    start = std::chrono::steady_clock::now();
     session->commit();
+    
metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start));
   } catch (const std::exception& exception) {
     logger_->log_warn("Caught \"%s\" (%s) during Processor::onTrigger of 
processor: %s (%s)",
         exception.what(), typeid(exception).name(), getUUIDStr(), getName());
@@ -210,10 +212,12 @@ void Processor::onTrigger(const 
std::shared_ptr<ProcessContext> &context, const
 
   try {
     // Call the virtual trigger function
-    const auto start = std::chrono::steady_clock::now();
+    auto start = std::chrono::steady_clock::now();
     onTrigger(context, session);
     
metrics_->addLastOnTriggerRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start));
+    start = std::chrono::steady_clock::now();
     session->commit();
+    
metrics_->addLastSessionCommitRuntime(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 - start));
   } catch (std::exception &exception) {
     logger_->log_warn("Caught \"%s\" (%s) during Processor::onTrigger of 
processor: %s (%s)",
         exception.what(), typeid(exception).name(), getUUIDStr(), getName());
diff --git a/libminifi/src/core/ProcessorMetrics.cpp 
b/libminifi/src/core/ProcessorMetrics.cpp
index e243a0178..8cd054888 100644
--- a/libminifi/src/core/ProcessorMetrics.cpp
+++ b/libminifi/src/core/ProcessorMetrics.cpp
@@ -26,7 +26,8 @@ namespace org::apache::nifi::minifi::core {
 
 ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
     : source_processor_(source_processor),
-      on_trigger_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT) {
+      on_trigger_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT),
+      session_commit_runtime_averager_(STORED_ON_TRIGGER_RUNTIME_COUNT) {
 }
 
 std::string ProcessorMetrics::getName() const {
@@ -46,6 +47,8 @@ std::vector<state::response::SerializedResponseNode> 
ProcessorMetrics::serialize
       {.name = "OnTriggerInvocations", .value = 
static_cast<uint32_t>(iterations.load())},
       {.name = "AverageOnTriggerRunTime", .value = 
static_cast<uint64_t>(getAverageOnTriggerRuntime().count())},
       {.name = "LastOnTriggerRunTime", .value = 
static_cast<uint64_t>(getLastOnTriggerRuntime().count())},
+      {.name = "AverageSessionCommitRunTime", .value = 
static_cast<uint64_t>(getAverageSessionCommitRuntime().count())},
+      {.name = "LastSessionCommitRunTime", .value = 
static_cast<uint64_t>(getLastSessionCommitRuntime().count())},
       {.name = "TransferredFlowFiles", .value = 
static_cast<uint32_t>(transferred_flow_files.load())},
       {.name = "TransferredBytes", .value = transferred_bytes.load()}
     }
@@ -73,6 +76,8 @@ std::vector<state::PublishedMetric> 
ProcessorMetrics::calculateMetrics() {
     {"onTrigger_invocations", static_cast<double>(iterations.load()), 
getCommonLabels()},
     {"average_onTrigger_runtime_milliseconds", 
static_cast<double>(getAverageOnTriggerRuntime().count()), getCommonLabels()},
     {"last_onTrigger_runtime_milliseconds", 
static_cast<double>(getLastOnTriggerRuntime().count()), getCommonLabels()},
+    {"average_session_commit_runtime_milliseconds", 
static_cast<double>(getAverageSessionCommitRuntime().count()), 
getCommonLabels()},
+    {"last_session_commit_runtime_milliseconds", 
static_cast<double>(getLastSessionCommitRuntime().count()), getCommonLabels()},
     {"transferred_flow_files", 
static_cast<double>(transferred_flow_files.load()), getCommonLabels()},
     {"transferred_bytes", static_cast<double>(transferred_bytes.load()), 
getCommonLabels()}
   };
@@ -105,14 +110,25 @@ std::chrono::milliseconds 
ProcessorMetrics::getLastOnTriggerRuntime() const {
   return on_trigger_runtime_averager_.getLastValue();
 }
 
+std::chrono::milliseconds ProcessorMetrics::getAverageSessionCommitRuntime() 
const {
+  return session_commit_runtime_averager_.getAverage();
+}
+
+void ProcessorMetrics::addLastSessionCommitRuntime(std::chrono::milliseconds 
runtime) {
+  session_commit_runtime_averager_.addValue(runtime);
+}
+
+std::chrono::milliseconds ProcessorMetrics::getLastSessionCommitRuntime() 
const {
+  return session_commit_runtime_averager_.getLastValue();
+}
 
 template<typename ValueType>
 requires Summable<ValueType> && DividableByInteger<ValueType>
 ValueType ProcessorMetrics::Averager<ValueType>::getAverage() const {
+  std::lock_guard<std::mutex> lock(average_value_mutex_);
   if (values_.empty()) {
     return {};
   }
-  std::lock_guard<std::mutex> lock(average_value_mutex_);
   return ranges::accumulate(values_, ValueType{}) / values_.size();
 }
 
diff --git a/libminifi/test/unit/MetricsTests.cpp 
b/libminifi/test/unit/MetricsTests.cpp
index 20aeb7c6e..a73cecae5 100644
--- a/libminifi/test/unit/MetricsTests.cpp
+++ b/libminifi/test/unit/MetricsTests.cpp
@@ -209,7 +209,7 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
   }
 }
 
-TEST_CASE("Test ProcessorMetrics", "[ProcessorMetrics]") {
+TEST_CASE("Test on trigger runtime processor metrics", "[ProcessorMetrics]") {
   DummyProcessor dummy_processor("dummy");
   minifi::core::ProcessorMetrics metrics(dummy_processor);
 
@@ -248,4 +248,43 @@ TEST_CASE("Test ProcessorMetrics", "[ProcessorMetrics]") {
   REQUIRE(metrics.getAverageOnTriggerRuntime() == 37ms);
 }
 
+TEST_CASE("Test commit runtime processor metrics", "[ProcessorMetrics]") {
+  DummyProcessor dummy_processor("dummy");
+  minifi::core::ProcessorMetrics metrics(dummy_processor);
+
+  REQUIRE("DummyProcessorMetrics" == metrics.getName());
+
+  REQUIRE(metrics.getLastSessionCommitRuntime() == 0ms);
+  REQUIRE(metrics.getAverageSessionCommitRuntime() == 0ms);
+
+  metrics.addLastSessionCommitRuntime(10ms);
+  metrics.addLastSessionCommitRuntime(20ms);
+  metrics.addLastSessionCommitRuntime(30ms);
+
+  REQUIRE(metrics.getLastSessionCommitRuntime() == 30ms);
+  REQUIRE(metrics.getAverageSessionCommitRuntime() == 20ms);
+
+  for (auto i = 0; i < 7; ++i) {
+    metrics.addLastSessionCommitRuntime(50ms);
+  }
+  REQUIRE(metrics.getAverageSessionCommitRuntime() == 41ms);
+  REQUIRE(metrics.getLastSessionCommitRuntime() == 50ms);
+
+  for (auto i = 0; i < 3; ++i) {
+    metrics.addLastSessionCommitRuntime(50ms);
+  }
+  REQUIRE(metrics.getAverageSessionCommitRuntime() == 50ms);
+  REQUIRE(metrics.getLastSessionCommitRuntime() == 50ms);
+
+  for (auto i = 0; i < 10; ++i) {
+    metrics.addLastSessionCommitRuntime(40ms);
+  }
+  REQUIRE(metrics.getAverageSessionCommitRuntime() == 40ms);
+  REQUIRE(metrics.getLastSessionCommitRuntime() == 40ms);
+
+  metrics.addLastSessionCommitRuntime(10ms);
+  REQUIRE(metrics.getLastSessionCommitRuntime() == 10ms);
+  REQUIRE(metrics.getAverageSessionCommitRuntime() == 37ms);
+}
+
 }  // namespace org::apache::nifi::minifi::test

Reply via email to