fgerlits commented on code in PR #1400:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r970642085


##########
METRICS.md:
##########
@@ -153,3 +132,40 @@ DeviceInfoNode is a system level metric that reports 
metrics about the system re
 | connection_name | Name of the connection defined in the flow configuration   
  |
 | component_uuid  | UUID of the component                                      
  |
 | component_name  | Name of the component                                      
  |
+
+## Processor Metrics
+
+Processor level metrics can be accessed for any processor provided by MiNiFi. 
These metrics correspond to the name of the processor appended by the "Metrics" 
suffix (e.g. GetFileMetrics, TailFileMetrics, etc.).
+
+### General Metrics
+
+There are general metrics that are available for all processors. Besides these 
metrics processors can implement additional metrics that are speicific to that 
processor.
+
+| Metric name                            | Labels                              
         | Description                                                          
               |
+|----------------------------------------|----------------------------------------------|-------------------------------------------------------------------------------------|
+| average_onTrigger_runtime_milliseconds | metric_class, processor_name, 
processor_uuid | The average runtime in milliseconds of the last 10 onTrigger 
calls of the processor |
+| last_onTrigger_runtime_milliseconds    | metric_class, processor_name, 
processor_uuid | The runtime in milliseconds of the last onTrigger call of the 
processor             |
+| transferred_flow_files                 | metric_class, processor_name, 
processor_uuid | Number of flow files transferred to a relationship             
                     |
+| transferred_bytes                      | metric_class, processor_name, 
processor_uuid | Number of bytes transferred to a relationship                  
                     |
+| transferred_to_\<relationship\>        | metric_class, processor_name, 
processor_uuid | Number of flow files transferred to a specific relationship    
                     |
+
+| Label          | Description                                                 
   |
+|----------------|----------------------------------------------------------------|
+| metric_class   | Class name to filter for this metric, set to GetFileMetrics 
   |

Review Comment:
   I think this should be `<processor type>Metrics` instead of `GetFileMetrics`



##########
extensions/standard-processors/processors/GetFile.h:
##########
@@ -48,70 +48,46 @@ struct GetFileRequest {
   std::string inputDirectory;
 };
 
-class GetFileMetrics : public state::response::ResponseNode {
+class GetFileMetrics : public core::ProcessorMetrics {
  public:
-  explicit GetFileMetrics(const CoreComponent& source_component)
-    : state::response::ResponseNode("GetFileMetrics"),
-      source_component_(source_component) {
-  }
-
-  std::string getName() const override {
-    return core::Connectable::getName();
+  explicit GetFileMetrics(const core::Processor& source_processor)
+    : core::ProcessorMetrics(source_processor) {
   }
 
   std::vector<state::response::SerializedResponseNode> serialize() override {
-    std::vector<state::response::SerializedResponseNode> resp;
-
-    state::response::SerializedResponseNode root_node;
-    root_node.name = source_component_.getUUIDStr();
-
-    state::response::SerializedResponseNode iter;
-    iter.name = "OnTriggerInvocations";
-    iter.value = (uint32_t)iterations_.load();
+    auto resp = core::ProcessorMetrics::serialize();
+    auto& root_node = resp[0];
 
-    root_node.children.push_back(iter);
+    state::response::SerializedResponseNode accepted_files_node;
+    accepted_files_node.name = "AcceptedFiles";
+    accepted_files_node.value = (uint32_t)accepted_files.load();
 
-    state::response::SerializedResponseNode accepted_files;
-    accepted_files.name = "AcceptedFiles";
-    accepted_files.value = (uint32_t)accepted_files_.load();
+    root_node.children.push_back(accepted_files_node);
 
-    root_node.children.push_back(accepted_files);
+    state::response::SerializedResponseNode input_bytes_node;
+    input_bytes_node.name = "InputBytes";
+    input_bytes_node.value = (uint32_t)input_bytes.load();

Review Comment:
   I don't think we can assume that `input_bytes` is less than 4 GB.  Can we 
change the type of this ValueNode to `uint64_t`?
   
   Also, I would change the types of `accepted_files` and `input_bytes` to what 
we want them to be (probably `uint32_t` and `uint64_t`) instead of making them 
`size_t` and then casting.



##########
extensions/standard-processors/tests/unit/ProcessorTests.cpp:
##########
@@ -822,3 +822,11 @@ TEST_CASE("isSingleThreaded - two threads for a single 
threaded processor", "[is
   REQUIRE(LogTestController::getInstance().contains("[warning] Processor 
myProc can not be run in parallel, its "
                                                     "\"max concurrent tasks\" 
value is too high. It was set to 1 from 2."));
 }
+
+TEST_CASE("Test getProcessorName", "[getProcessorName]") {

Review Comment:
   this looks like a typo:
   ```suggestion
   TEST_CASE("Test getProcessorType", "[getProcessorType]") {
   ```



##########
libminifi/include/core/Processor.h:
##########
@@ -29,19 +29,36 @@
 #include <unordered_set>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include "ConfigurableComponent.h"
 #include "Connectable.h"
 #include "Core.h"
 #include "core/Annotation.h"
 #include "Scheduling.h"
 #include "utils/TimeUtil.h"
+#include "core/state/nodes/MetricsBase.h"
+#include "utils/gsl.h"
+
+#if WIN32

Review Comment:
   I'm not sure this doesn't work, but we use `#ifdef WIN32` everywhere else



##########
libminifi/include/core/Processor.h:
##########
@@ -62,10 +79,41 @@ constexpr std::chrono::nanoseconds 
MINIMUM_SCHEDULING_NANOS{30000};
 
 #define BUILDING_DLL 1
 
-class Processor : public Connectable, public ConfigurableComponent {
+class Processor;
+
+class ProcessorMetrics : public state::response::ResponseNode {
+ public:
+  explicit ProcessorMetrics(const Processor& source_processor);
+
+  [[nodiscard]] std::string getName() const override;
+
+  std::vector<state::response::SerializedResponseNode> serialize() override;
+  std::vector<state::PublishedMetric> calculateMetrics() override;
+  void incrementRelationshipTransferCount(const std::string& relationship);
+  std::chrono::milliseconds getAverageOnTriggerRuntime() const;
+  std::chrono::milliseconds getLastOnTriggerRuntime() const;
+  void addLastOnTriggerRuntime(std::chrono::milliseconds runtime);
+
+  std::atomic<size_t> iterations{0};
+  std::atomic<size_t> transferred_flow_files{0};
+  std::atomic<uint64_t> transferred_bytes{0};
+
+ protected:
+  [[nodiscard]] std::unordered_map<std::string, std::string> getCommonLabels() 
const;
+  static const uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10;
+
+  std::mutex relationship_mutex_;

Review Comment:
   I would rename the mutex to make it clearer what it guards:
   ```suggestion
     std::mutex transferred_relationships_mutex_;
   ```



##########
libminifi/test/unit/ProcessSessionTests.cpp:
##########
@@ -28,22 +28,6 @@
 
 namespace {
 
-class DummyProcessor : public minifi::core::Processor {
-  using minifi::core::Processor::Processor;
-
- public:
-  static constexpr const char* Description = "A processor that does nothing.";
-  static auto properties() { return std::array<core::Property, 0>{}; }
-  static auto relationships() { return std::array<core::Relationship, 0>{}; }
-  static constexpr bool SupportsDynamicProperties = false;
-  static constexpr bool SupportsDynamicRelationships = false;
-  static constexpr core::annotation::Input InputRequirement = 
core::annotation::Input::INPUT_ALLOWED;
-  static constexpr bool IsSingleThreaded = false;
-  ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
-};
-
-REGISTER_RESOURCE(DummyProcessor, Processor);

Review Comment:
   This is a good idea.  There are two more identical copies of this dummy 
processor, in `ContentRepositoryDependentTests.h` and 
`GCPCredentialsControllerServiceTests.cpp`; can you get rid of them, too, 
please?



##########
docker/test/integration/minifi/core/PrometheusChecker.py:
##########
@@ -0,0 +1,90 @@
+import time
+from prometheus_api_client import PrometheusConnect
+
+
+class PrometheusChecker:
+    def __init__(self):
+        self.prometheus_client = PrometheusConnect(url="http://localhost:9090";)

Review Comment:
   There used to be a `disable_ssl=True` here; are we connecting using SSL now?



##########
libminifi/include/core/ProcessSession.h:
##########
@@ -37,6 +37,7 @@
 #include "WeakReference.h"
 #include "provenance/Provenance.h"
 #include "utils/gsl.h"
+#include "Processor.h"

Review Comment:
   I would move `ProcessorMetrics` to its own files (both `.h` and `.cpp`) and 
only include `ProcessorMetrics.h` instead of all of `Processor.h`.



##########
libminifi/src/core/Processor.cpp:
##########
@@ -40,9 +41,133 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::core {
 
-Processor::Processor(const std::string& name)
+ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
+    : source_processor_(source_processor) {
+  on_trigger_runtimes_.reserve(STORED_ON_TRIGGER_RUNTIME_COUNT);

Review Comment:
   `on_trigger_runtimes_`, together with `next_average_index_`, the mutex, and 
the functions using them, look like they could be packaged in a useful and 
reusable `Averager` class.



##########
docker/test/integration/minifi/core/PrometheusChecker.py:
##########
@@ -0,0 +1,90 @@
+import time
+from prometheus_api_client import PrometheusConnect
+
+
+class PrometheusChecker:
+    def __init__(self):
+        self.prometheus_client = PrometheusConnect(url="http://localhost:9090";)
+
+    def wait_for_metric_class_on_prometheus(self, metric_class, 
timeout_seconds):
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            if self.verify_metric_class(metric_class):
+                return True
+            time.sleep(1)
+        return False
+
+    def wait_for_processor_metric_on_prometheus(self, metric_class, 
timeout_seconds, processor_name):
+        start_time = time.perf_counter()
+        while (time.perf_counter() - start_time) < timeout_seconds:
+            if self.verify_processor_metric(metric_class, processor_name):
+                return True
+            time.sleep(1)
+        return False
+
+    def verify_processor_metric(self, metric_class, processor_name):
+        if metric_class == "GetFileMetrics":
+            return self.verify_getfile_metrics(metric_class, processor_name)
+        else:
+            return self.verify_general_processor_metrics(metric_class, 
processor_name)
+
+    def verify_metric_class(self, metric_class):
+        if metric_class == "RepositoryMetrics":
+            return self.verify_repository_metrics()
+        elif metric_class == "QueueMetrics":
+            return self.verify_queue_metrics()
+        elif metric_class == "FlowInformation":
+            return self.verify_flow_information_metrics()
+        elif metric_class == "DeviceInfoNode":
+            return self.verify_device_info_node_metrics()
+        else:
+            raise Exception("Metric class '%s' verification is not 
implemented" % metric_class)
+
+    def verify_repository_metrics(self):
+        label_list = [{'repository_name': 'provenance'}, {'repository_name': 
'flowfile'}]
+        for labels in label_list:
+            if not self.verify_metrics_exist(['minifi_is_running', 
'minifi_is_full', 'minifi_repository_size'], 'RepositoryMetrics', labels):
+                return False
+        return True
+
+    def verify_queue_metrics(self):
+        return self.verify_metrics_exist(['minifi_queue_data_size', 
'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 
'QueueMetrics')
+
+    def verify_general_processor_metrics(self, metric_class, processor_name):
+        labels = {'processor_name': processor_name}
+        return 
self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 
'minifi_last_onTrigger_runtime_milliseconds'], metric_class, labels) and \
+            
self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 
'minifi_transferred_flow_files', 'minifi_transferred_to_success', 
'minifi_transferred_bytes'], metric_class, labels)
+
+    def verify_getfile_metrics(self, metric_class, processor_name):
+        labels = {'processor_name': processor_name}
+        return self.verify_general_processor_metrics(metric_class, 
processor_name) and \
+            self.verify_metrics_exist(['minifi_input_bytes', 
'minifi_accepted_files'], metric_class, labels)
+
+    def verify_flow_information_metrics(self):
+        return self.verify_metrics_exist(['minifi_queue_data_size', 
'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 
'FlowInformation') and \
+            self.verify_metric_exists('minifi_is_running', 'FlowInformation', 
{'component_name': 'FlowController'})
+
+    def verify_device_info_node_metrics(self):
+        return self.verify_metrics_exist(['minifi_physical_mem', 
'minifi_memory_usage', 'minifi_cpu_utilization'], 'DeviceInfoNode')
+
+    def verify_metric_exists(self, metric_name, metric_class, labels={}):
+        labels['metric_class'] = metric_class
+        return 
len(self.prometheus_client.get_current_metric_value(metric_name=metric_name, 
label_config=labels)) > 0
+
+    def verify_metrics_exist(self, metric_names, metric_class, labels={}):
+        for metric_name in metric_names:
+            if not self.verify_metric_exists(metric_name, metric_class, 
labels):
+                return False
+        return True
+
+    def verify_metric_larger_than_zero(self, metric_name, metric_class, 
labels={}):
+        labels['metric_class'] = metric_class
+        result = 
self.prometheus_client.get_current_metric_value(metric_name=metric_name, 
label_config=labels)
+        print(result)

Review Comment:
   is this a debug log left here by accident?



##########
libminifi/include/core/Processor.h:
##########
@@ -29,19 +29,36 @@
 #include <unordered_set>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include "ConfigurableComponent.h"
 #include "Connectable.h"
 #include "Core.h"
 #include "core/Annotation.h"
 #include "Scheduling.h"
 #include "utils/TimeUtil.h"
+#include "core/state/nodes/MetricsBase.h"
+#include "utils/gsl.h"
+
+#if WIN32
+#define ADD_GET_PROCESSOR_NAME \
+  std::string getProcessorType() const override { \
+    return org::apache::nifi::minifi::utils::StringUtils::split(__FUNCDNAME__, 
"@")[1]; \
+  }
+#else
+#define ADD_GET_PROCESSOR_NAME \
+  std::string getProcessorType() const override { \
+    auto splitted = 
org::apache::nifi::minifi::utils::StringUtils::split(__PRETTY_FUNCTION__, 
"::"); \
+    return splitted[splitted.size() - 2]; \
+  }
+#endif

Review Comment:
   I think using `typeid(*this).name()` would be better.  We would still need 
compiler-specific code, but it could be hidden in a 
`utils::demangleClassName()` function.



##########
libminifi/src/core/Processor.cpp:
##########
@@ -40,9 +41,133 @@ using namespace std::literals::chrono_literals;
 
 namespace org::apache::nifi::minifi::core {
 
-Processor::Processor(const std::string& name)
+ProcessorMetrics::ProcessorMetrics(const Processor& source_processor)
+    : source_processor_(source_processor) {
+  on_trigger_runtimes_.reserve(STORED_ON_TRIGGER_RUNTIME_COUNT);
+}
+
+std::string ProcessorMetrics::getName() const {
+  return source_processor_.getProcessorType() + "Metrics";
+}
+
+std::unordered_map<std::string, std::string> 
ProcessorMetrics::getCommonLabels() const {
+  return {{"metric_class", getName()}, {"processor_name", 
source_processor_.getName()}, {"processor_uuid", 
source_processor_.getUUIDStr()}};
+}
+
+std::vector<state::response::SerializedResponseNode> 
ProcessorMetrics::serialize() {
+  std::vector<state::response::SerializedResponseNode> resp;
+
+  state::response::SerializedResponseNode root_node;
+  root_node.name = source_processor_.getUUIDStr();
+
+  state::response::SerializedResponseNode iter;
+  iter.name = "OnTriggerInvocations";
+  iter.value = static_cast<uint32_t>(iterations.load());
+
+  root_node.children.push_back(iter);
+
+  state::response::SerializedResponseNode average_ontrigger_runtime_node;
+  average_ontrigger_runtime_node.name = "AverageOnTriggerRunTime";
+  average_ontrigger_runtime_node.value = 
static_cast<uint64_t>(getAverageOnTriggerRuntime().count());
+
+  root_node.children.push_back(average_ontrigger_runtime_node);
+
+  state::response::SerializedResponseNode last_ontrigger_runtime_node;
+  last_ontrigger_runtime_node.name = "LastOnTriggerRunTime";
+  last_ontrigger_runtime_node.value = 
static_cast<uint64_t>(getLastOnTriggerRuntime().count());
+
+  root_node.children.push_back(last_ontrigger_runtime_node);
+
+  state::response::SerializedResponseNode transferred_flow_files_node;
+  transferred_flow_files_node.name = "TransferredFlowFiles";
+  transferred_flow_files_node.value = 
static_cast<uint32_t>(transferred_flow_files.load());
+
+  root_node.children.push_back(transferred_flow_files_node);
+
+  for (const auto& [relationship, count] : transferred_relationships_) {
+    state::response::SerializedResponseNode transferred_to_relationship_node;
+    transferred_to_relationship_node.name = 
std::string("TransferredTo").append(1, 
toupper(relationship[0])).append(relationship.substr(1));
+    transferred_to_relationship_node.value = static_cast<uint32_t>(count);
+
+    root_node.children.push_back(transferred_to_relationship_node);
+  }

Review Comment:
   This may be overly paranoid, but I would add a 
`gsl_Expects(relationship.size() > 0)` before line 89.



##########
libminifi/test/unit/MetricsTests.cpp:
##########
@@ -203,3 +208,32 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") {
     REQUIRE("0" == size.value);
   }
 }
+
+TEST_CASE("Test ProcessorMetrics", "[ProcessorMetrics]") {
+  DummyProcessor dummy_processor("dummy");
+  minifi::core::ProcessorMetrics metrics(dummy_processor);
+
+  REQUIRE("DummyProcessorMetrics" == metrics.getName());
+
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 0ms);
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 0ms);
+
+  metrics.addLastOnTriggerRuntime(10ms);
+  metrics.addLastOnTriggerRuntime(20ms);
+  metrics.addLastOnTriggerRuntime(30ms);
+
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 30ms);
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 20ms);
+
+  for (auto i = 0; i < 10; ++i) {
+    metrics.addLastOnTriggerRuntime(50ms);
+  }
+
+  REQUIRE(metrics.getAverageOnTriggerRuntime() == 50ms);
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 50ms);
+
+  metrics.addLastOnTriggerRuntime(10ms);
+  REQUIRE(metrics.getLastOnTriggerRuntime() == 10ms);
+}

Review Comment:
   we could add
   ```c++
     REQUIRE(metrics.getLastOnTriggerRuntime() == 46ms);
   ```
   at the end, as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to