fgerlits commented on code in PR #1400: URL: https://github.com/apache/nifi-minifi-cpp/pull/1400#discussion_r970642085
########## METRICS.md: ########## @@ -153,3 +132,40 @@ DeviceInfoNode is a system level metric that reports metrics about the system re | connection_name | Name of the connection defined in the flow configuration | | component_uuid | UUID of the component | | component_name | Name of the component | + +## Processor Metrics + +Processor level metrics can be accessed for any processor provided by MiNiFi. These metrics correspond to the name of the processor appended by the "Metrics" suffix (e.g. GetFileMetrics, TailFileMetrics, etc.). + +### General Metrics + +There are general metrics that are available for all processors. Besides these metrics processors can implement additional metrics that are speicific to that processor. + +| Metric name | Labels | Description | +|----------------------------------------|----------------------------------------------|-------------------------------------------------------------------------------------| +| average_onTrigger_runtime_milliseconds | metric_class, processor_name, processor_uuid | The average runtime in milliseconds of the last 10 onTrigger calls of the processor | +| last_onTrigger_runtime_milliseconds | metric_class, processor_name, processor_uuid | The runtime in milliseconds of the last onTrigger call of the processor | +| transferred_flow_files | metric_class, processor_name, processor_uuid | Number of flow files transferred to a relationship | +| transferred_bytes | metric_class, processor_name, processor_uuid | Number of bytes transferred to a relationship | +| transferred_to_\<relationship\> | metric_class, processor_name, processor_uuid | Number of flow files transferred to a specific relationship | + +| Label | Description | +|----------------|----------------------------------------------------------------| +| metric_class | Class name to filter for this metric, set to GetFileMetrics | Review Comment: I think this should be `<processor type>Metrics` instead of `GetFileMetrics` ########## extensions/standard-processors/processors/GetFile.h: ########## @@ -48,70 +48,46 @@ struct GetFileRequest { std::string inputDirectory; }; -class GetFileMetrics : public state::response::ResponseNode { +class GetFileMetrics : public core::ProcessorMetrics { public: - explicit GetFileMetrics(const CoreComponent& source_component) - : state::response::ResponseNode("GetFileMetrics"), - source_component_(source_component) { - } - - std::string getName() const override { - return core::Connectable::getName(); + explicit GetFileMetrics(const core::Processor& source_processor) + : core::ProcessorMetrics(source_processor) { } std::vector<state::response::SerializedResponseNode> serialize() override { - std::vector<state::response::SerializedResponseNode> resp; - - state::response::SerializedResponseNode root_node; - root_node.name = source_component_.getUUIDStr(); - - state::response::SerializedResponseNode iter; - iter.name = "OnTriggerInvocations"; - iter.value = (uint32_t)iterations_.load(); + auto resp = core::ProcessorMetrics::serialize(); + auto& root_node = resp[0]; - root_node.children.push_back(iter); + state::response::SerializedResponseNode accepted_files_node; + accepted_files_node.name = "AcceptedFiles"; + accepted_files_node.value = (uint32_t)accepted_files.load(); - state::response::SerializedResponseNode accepted_files; - accepted_files.name = "AcceptedFiles"; - accepted_files.value = (uint32_t)accepted_files_.load(); + root_node.children.push_back(accepted_files_node); - root_node.children.push_back(accepted_files); + state::response::SerializedResponseNode input_bytes_node; + input_bytes_node.name = "InputBytes"; + input_bytes_node.value = (uint32_t)input_bytes.load(); Review Comment: I don't think we can assume that `input_bytes` is less than 4 GB. Can we change the type of this ValueNode to `uint64_t`? Also, I would change the types of `accepted_files` and `input_bytes` to what we want them to be (probably `uint32_t` and `uint64_t`) instead of making them `size_t` and then casting. ########## extensions/standard-processors/tests/unit/ProcessorTests.cpp: ########## @@ -822,3 +822,11 @@ TEST_CASE("isSingleThreaded - two threads for a single threaded processor", "[is REQUIRE(LogTestController::getInstance().contains("[warning] Processor myProc can not be run in parallel, its " "\"max concurrent tasks\" value is too high. It was set to 1 from 2.")); } + +TEST_CASE("Test getProcessorName", "[getProcessorName]") { Review Comment: this looks like a typo: ```suggestion TEST_CASE("Test getProcessorType", "[getProcessorType]") { ``` ########## libminifi/include/core/Processor.h: ########## @@ -29,19 +29,36 @@ #include <unordered_set> #include <unordered_map> #include <utility> +#include <vector> #include "ConfigurableComponent.h" #include "Connectable.h" #include "Core.h" #include "core/Annotation.h" #include "Scheduling.h" #include "utils/TimeUtil.h" +#include "core/state/nodes/MetricsBase.h" +#include "utils/gsl.h" + +#if WIN32 Review Comment: I'm not sure this doesn't work, but we use `#ifdef WIN32` everywhere else ########## libminifi/include/core/Processor.h: ########## @@ -62,10 +79,41 @@ constexpr std::chrono::nanoseconds MINIMUM_SCHEDULING_NANOS{30000}; #define BUILDING_DLL 1 -class Processor : public Connectable, public ConfigurableComponent { +class Processor; + +class ProcessorMetrics : public state::response::ResponseNode { + public: + explicit ProcessorMetrics(const Processor& source_processor); + + [[nodiscard]] std::string getName() const override; + + std::vector<state::response::SerializedResponseNode> serialize() override; + std::vector<state::PublishedMetric> calculateMetrics() override; + void incrementRelationshipTransferCount(const std::string& relationship); + std::chrono::milliseconds getAverageOnTriggerRuntime() const; + std::chrono::milliseconds getLastOnTriggerRuntime() const; + void addLastOnTriggerRuntime(std::chrono::milliseconds runtime); + + std::atomic<size_t> iterations{0}; + std::atomic<size_t> transferred_flow_files{0}; + std::atomic<uint64_t> transferred_bytes{0}; + + protected: + [[nodiscard]] std::unordered_map<std::string, std::string> getCommonLabels() const; + static const uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10; + + std::mutex relationship_mutex_; Review Comment: I would rename the mutex to make it clearer what it guards: ```suggestion std::mutex transferred_relationships_mutex_; ``` ########## libminifi/test/unit/ProcessSessionTests.cpp: ########## @@ -28,22 +28,6 @@ namespace { -class DummyProcessor : public minifi::core::Processor { - using minifi::core::Processor::Processor; - - public: - static constexpr const char* Description = "A processor that does nothing."; - static auto properties() { return std::array<core::Property, 0>{}; } - static auto relationships() { return std::array<core::Relationship, 0>{}; } - static constexpr bool SupportsDynamicProperties = false; - static constexpr bool SupportsDynamicRelationships = false; - static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_ALLOWED; - static constexpr bool IsSingleThreaded = false; - ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS -}; - -REGISTER_RESOURCE(DummyProcessor, Processor); Review Comment: This is a good idea. There are two more identical copies of this dummy processor, in `ContentRepositoryDependentTests.h` and `GCPCredentialsControllerServiceTests.cpp`; can you get rid of them, too, please? ########## docker/test/integration/minifi/core/PrometheusChecker.py: ########## @@ -0,0 +1,90 @@ +import time +from prometheus_api_client import PrometheusConnect + + +class PrometheusChecker: + def __init__(self): + self.prometheus_client = PrometheusConnect(url="http://localhost:9090") Review Comment: There used to be a `disable_ssl=True` here; are we connecting using SSL now? ########## libminifi/include/core/ProcessSession.h: ########## @@ -37,6 +37,7 @@ #include "WeakReference.h" #include "provenance/Provenance.h" #include "utils/gsl.h" +#include "Processor.h" Review Comment: I would move `ProcessorMetrics` to its own files (both `.h` and `.cpp`) and only include `ProcessorMetrics.h` instead of all of `Processor.h`. ########## libminifi/src/core/Processor.cpp: ########## @@ -40,9 +41,133 @@ using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::core { -Processor::Processor(const std::string& name) +ProcessorMetrics::ProcessorMetrics(const Processor& source_processor) + : source_processor_(source_processor) { + on_trigger_runtimes_.reserve(STORED_ON_TRIGGER_RUNTIME_COUNT); Review Comment: `on_trigger_runtimes_`, together with `next_average_index_`, the mutex, and the functions using them, look like they could be packaged in a useful and reusable `Averager` class. ########## docker/test/integration/minifi/core/PrometheusChecker.py: ########## @@ -0,0 +1,90 @@ +import time +from prometheus_api_client import PrometheusConnect + + +class PrometheusChecker: + def __init__(self): + self.prometheus_client = PrometheusConnect(url="http://localhost:9090") + + def wait_for_metric_class_on_prometheus(self, metric_class, timeout_seconds): + start_time = time.perf_counter() + while (time.perf_counter() - start_time) < timeout_seconds: + if self.verify_metric_class(metric_class): + return True + time.sleep(1) + return False + + def wait_for_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name): + start_time = time.perf_counter() + while (time.perf_counter() - start_time) < timeout_seconds: + if self.verify_processor_metric(metric_class, processor_name): + return True + time.sleep(1) + return False + + def verify_processor_metric(self, metric_class, processor_name): + if metric_class == "GetFileMetrics": + return self.verify_getfile_metrics(metric_class, processor_name) + else: + return self.verify_general_processor_metrics(metric_class, processor_name) + + def verify_metric_class(self, metric_class): + if metric_class == "RepositoryMetrics": + return self.verify_repository_metrics() + elif metric_class == "QueueMetrics": + return self.verify_queue_metrics() + elif metric_class == "FlowInformation": + return self.verify_flow_information_metrics() + elif metric_class == "DeviceInfoNode": + return self.verify_device_info_node_metrics() + else: + raise Exception("Metric class '%s' verification is not implemented" % metric_class) + + def verify_repository_metrics(self): + label_list = [{'repository_name': 'provenance'}, {'repository_name': 'flowfile'}] + for labels in label_list: + if not self.verify_metrics_exist(['minifi_is_running', 'minifi_is_full', 'minifi_repository_size'], 'RepositoryMetrics', labels): + return False + return True + + def verify_queue_metrics(self): + return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 'QueueMetrics') + + def verify_general_processor_metrics(self, metric_class, processor_name): + labels = {'processor_name': processor_name} + return self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 'minifi_last_onTrigger_runtime_milliseconds'], metric_class, labels) and \ + self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 'minifi_transferred_flow_files', 'minifi_transferred_to_success', 'minifi_transferred_bytes'], metric_class, labels) + + def verify_getfile_metrics(self, metric_class, processor_name): + labels = {'processor_name': processor_name} + return self.verify_general_processor_metrics(metric_class, processor_name) and \ + self.verify_metrics_exist(['minifi_input_bytes', 'minifi_accepted_files'], metric_class, labels) + + def verify_flow_information_metrics(self): + return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 'FlowInformation') and \ + self.verify_metric_exists('minifi_is_running', 'FlowInformation', {'component_name': 'FlowController'}) + + def verify_device_info_node_metrics(self): + return self.verify_metrics_exist(['minifi_physical_mem', 'minifi_memory_usage', 'minifi_cpu_utilization'], 'DeviceInfoNode') + + def verify_metric_exists(self, metric_name, metric_class, labels={}): + labels['metric_class'] = metric_class + return len(self.prometheus_client.get_current_metric_value(metric_name=metric_name, label_config=labels)) > 0 + + def verify_metrics_exist(self, metric_names, metric_class, labels={}): + for metric_name in metric_names: + if not self.verify_metric_exists(metric_name, metric_class, labels): + return False + return True + + def verify_metric_larger_than_zero(self, metric_name, metric_class, labels={}): + labels['metric_class'] = metric_class + result = self.prometheus_client.get_current_metric_value(metric_name=metric_name, label_config=labels) + print(result) Review Comment: is this a debug log left here by accident? ########## libminifi/include/core/Processor.h: ########## @@ -29,19 +29,36 @@ #include <unordered_set> #include <unordered_map> #include <utility> +#include <vector> #include "ConfigurableComponent.h" #include "Connectable.h" #include "Core.h" #include "core/Annotation.h" #include "Scheduling.h" #include "utils/TimeUtil.h" +#include "core/state/nodes/MetricsBase.h" +#include "utils/gsl.h" + +#if WIN32 +#define ADD_GET_PROCESSOR_NAME \ + std::string getProcessorType() const override { \ + return org::apache::nifi::minifi::utils::StringUtils::split(__FUNCDNAME__, "@")[1]; \ + } +#else +#define ADD_GET_PROCESSOR_NAME \ + std::string getProcessorType() const override { \ + auto splitted = org::apache::nifi::minifi::utils::StringUtils::split(__PRETTY_FUNCTION__, "::"); \ + return splitted[splitted.size() - 2]; \ + } +#endif Review Comment: I think using `typeid(*this).name()` would be better. We would still need compiler-specific code, but it could be hidden in a `utils::demangleClassName()` function. ########## libminifi/src/core/Processor.cpp: ########## @@ -40,9 +41,133 @@ using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::core { -Processor::Processor(const std::string& name) +ProcessorMetrics::ProcessorMetrics(const Processor& source_processor) + : source_processor_(source_processor) { + on_trigger_runtimes_.reserve(STORED_ON_TRIGGER_RUNTIME_COUNT); +} + +std::string ProcessorMetrics::getName() const { + return source_processor_.getProcessorType() + "Metrics"; +} + +std::unordered_map<std::string, std::string> ProcessorMetrics::getCommonLabels() const { + return {{"metric_class", getName()}, {"processor_name", source_processor_.getName()}, {"processor_uuid", source_processor_.getUUIDStr()}}; +} + +std::vector<state::response::SerializedResponseNode> ProcessorMetrics::serialize() { + std::vector<state::response::SerializedResponseNode> resp; + + state::response::SerializedResponseNode root_node; + root_node.name = source_processor_.getUUIDStr(); + + state::response::SerializedResponseNode iter; + iter.name = "OnTriggerInvocations"; + iter.value = static_cast<uint32_t>(iterations.load()); + + root_node.children.push_back(iter); + + state::response::SerializedResponseNode average_ontrigger_runtime_node; + average_ontrigger_runtime_node.name = "AverageOnTriggerRunTime"; + average_ontrigger_runtime_node.value = static_cast<uint64_t>(getAverageOnTriggerRuntime().count()); + + root_node.children.push_back(average_ontrigger_runtime_node); + + state::response::SerializedResponseNode last_ontrigger_runtime_node; + last_ontrigger_runtime_node.name = "LastOnTriggerRunTime"; + last_ontrigger_runtime_node.value = static_cast<uint64_t>(getLastOnTriggerRuntime().count()); + + root_node.children.push_back(last_ontrigger_runtime_node); + + state::response::SerializedResponseNode transferred_flow_files_node; + transferred_flow_files_node.name = "TransferredFlowFiles"; + transferred_flow_files_node.value = static_cast<uint32_t>(transferred_flow_files.load()); + + root_node.children.push_back(transferred_flow_files_node); + + for (const auto& [relationship, count] : transferred_relationships_) { + state::response::SerializedResponseNode transferred_to_relationship_node; + transferred_to_relationship_node.name = std::string("TransferredTo").append(1, toupper(relationship[0])).append(relationship.substr(1)); + transferred_to_relationship_node.value = static_cast<uint32_t>(count); + + root_node.children.push_back(transferred_to_relationship_node); + } Review Comment: This may be overly paranoid, but I would add a `gsl_Expects(relationship.size() > 0)` before line 89. ########## libminifi/test/unit/MetricsTests.cpp: ########## @@ -203,3 +208,32 @@ TEST_CASE("RepositorymetricsHaveRepo", "[c2m4]") { REQUIRE("0" == size.value); } } + +TEST_CASE("Test ProcessorMetrics", "[ProcessorMetrics]") { + DummyProcessor dummy_processor("dummy"); + minifi::core::ProcessorMetrics metrics(dummy_processor); + + REQUIRE("DummyProcessorMetrics" == metrics.getName()); + + REQUIRE(metrics.getLastOnTriggerRuntime() == 0ms); + REQUIRE(metrics.getAverageOnTriggerRuntime() == 0ms); + + metrics.addLastOnTriggerRuntime(10ms); + metrics.addLastOnTriggerRuntime(20ms); + metrics.addLastOnTriggerRuntime(30ms); + + REQUIRE(metrics.getLastOnTriggerRuntime() == 30ms); + REQUIRE(metrics.getAverageOnTriggerRuntime() == 20ms); + + for (auto i = 0; i < 10; ++i) { + metrics.addLastOnTriggerRuntime(50ms); + } + + REQUIRE(metrics.getAverageOnTriggerRuntime() == 50ms); + REQUIRE(metrics.getLastOnTriggerRuntime() == 50ms); + + metrics.addLastOnTriggerRuntime(10ms); + REQUIRE(metrics.getLastOnTriggerRuntime() == 10ms); +} Review Comment: we could add ```c++ REQUIRE(metrics.getLastOnTriggerRuntime() == 46ms); ``` at the end, as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org