lordgamez commented on a change in pull request #1066:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1066#discussion_r629362515



##########
File path: extensions/pdh/PDHCounters.h
##########
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <TCHAR.h>
+#include <pdh.h>
+#include <pdhmsg.h>
+#include <string>
+
+#include "PerformanceDataCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class PDHCounterBase : public PerformanceDataCounter {

Review comment:
       I would prefer a naming like PDHCounter -> SinglePDHCounter, 
PDHCounterArray just to lose the Base.

##########
File path: extensions/pdh/MemoryConsumptionCounter.h
##########
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include "PerformanceDataCounter.h"
+#include <string>
+#include "utils/OsUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class MemoryConsumptionCounter : public PerformanceDataCounter {
+ public:
+  MemoryConsumptionCounter() {

Review comment:
       This can be removed.

##########
File path: extensions/pdh/PerformanceDataMonitor.cpp
##########
@@ -0,0 +1,283 @@
+/**
+ * @file GenerateFlowFile.cpp
+ * GenerateFlowFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PerformanceDataMonitor.h"
+#include "PDHCounters.h"
+#include "MemoryConsumptionCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Relationship PerformanceDataMonitor::Success("success", "All files are 
routed to success");
+
+core::Property PerformanceDataMonitor::PredefinedGroups(
+    core::PropertyBuilder::createProperty("Predefined Groups")->
+    withDescription("Comma separated list to which predefined groups should be 
included")->
+    withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::CustomPDHCounters(
+    core::PropertyBuilder::createProperty("Custom PDH Counters")->
+    withDescription("Comma separated list of PDHCounters to collect from")->
+    withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::OutputFormatProperty(
+    core::PropertyBuilder::createProperty("Output format")->
+    withDescription("Format of the created flowfiles")->
+    withAllowableValue<std::string>(JSON_FORMAT_STR)->
+    withAllowableValue(OPEN_TELEMETRY_FORMAT_STR)->
+    withDefaultValue(JSON_FORMAT_STR)->build());
+
+PerformanceDataMonitor::~PerformanceDataMonitor() {
+  if (pdh_query_ != nullptr)
+    PdhCloseQuery(pdh_query_);
+  for (PerformanceDataCounter* resource_consumption_counter : 
resource_consumption_counters_) {
+    delete resource_consumption_counter;
+  }
+}
+
+void PerformanceDataMonitor::onSchedule(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+  setupMembersFromProperties(context);
+
+  std::vector<PerformanceDataCounter*> valid_counters;
+
+  for (PerformanceDataCounter* counter : resource_consumption_counters_) {
+    PDHCounterBase* pdh_counter = dynamic_cast<PDHCounterBase*> (counter);
+    if (pdh_counter != nullptr) {
+      if (pdh_query_ == nullptr)
+        PdhOpenQuery(NULL, NULL, &pdh_query_);

Review comment:
       This could changed to pass `nullptr`s instead.

##########
File path: extensions/pdh/PerformanceDataMonitor.cpp
##########
@@ -0,0 +1,283 @@
+/**
+ * @file GenerateFlowFile.cpp
+ * GenerateFlowFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PerformanceDataMonitor.h"
+#include "PDHCounters.h"
+#include "MemoryConsumptionCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Relationship PerformanceDataMonitor::Success("success", "All files are 
routed to success");
+
+core::Property PerformanceDataMonitor::PredefinedGroups(
+    core::PropertyBuilder::createProperty("Predefined Groups")->
+    withDescription("Comma separated list to which predefined groups should be 
included")->
+    withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::CustomPDHCounters(
+    core::PropertyBuilder::createProperty("Custom PDH Counters")->
+    withDescription("Comma separated list of PDHCounters to collect from")->
+    withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::OutputFormatProperty(
+    core::PropertyBuilder::createProperty("Output format")->
+    withDescription("Format of the created flowfiles")->
+    withAllowableValue<std::string>(JSON_FORMAT_STR)->
+    withAllowableValue(OPEN_TELEMETRY_FORMAT_STR)->
+    withDefaultValue(JSON_FORMAT_STR)->build());
+
+PerformanceDataMonitor::~PerformanceDataMonitor() {
+  if (pdh_query_ != nullptr)
+    PdhCloseQuery(pdh_query_);
+  for (PerformanceDataCounter* resource_consumption_counter : 
resource_consumption_counters_) {
+    delete resource_consumption_counter;
+  }
+}
+
+void PerformanceDataMonitor::onSchedule(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+  setupMembersFromProperties(context);
+
+  std::vector<PerformanceDataCounter*> valid_counters;
+
+  for (PerformanceDataCounter* counter : resource_consumption_counters_) {
+    PDHCounterBase* pdh_counter = dynamic_cast<PDHCounterBase*> (counter);
+    if (pdh_counter != nullptr) {
+      if (pdh_query_ == nullptr)
+        PdhOpenQuery(NULL, NULL, &pdh_query_);
+      PDH_STATUS add_to_query_result = pdh_counter->addToQuery(pdh_query_);
+      if (add_to_query_result != ERROR_SUCCESS) {
+        logger_->log_error(("Error adding " + pdh_counter->getName() + " to 
query: " + std::to_string(add_to_query_result)).c_str());
+        delete counter;

Review comment:
       Could we just simply iterate through the elements of 
`resource_consumption_counters_` and remove the current element when 
`addToQuery` fails? That could be done without invalidating the iterator.

##########
File path: extensions/pdh/PDHCounters.h
##########
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <TCHAR.h>
+#include <pdh.h>
+#include <pdhmsg.h>
+#include <string>
+
+#include "PerformanceDataCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class PDHCounterBase : public PerformanceDataCounter {
+ public:
+  virtual ~PDHCounterBase() {}
+  static PDHCounterBase* createPDHCounter(const std::string& query_name, bool 
is_double = true);
+
+  const std::string& getName() const;
+  virtual std::string getObjectName() const;
+  virtual std::string getCounterName() const;
+  virtual PDH_STATUS addToQuery(PDH_HQUERY& pdh_query) = 0;
+  virtual PDH_STATUS collectData() = 0;
+ protected:
+  PDHCounterBase(const std::string& query_name, bool is_double) : 
PerformanceDataCounter(),
+    is_double_format_(is_double), pdh_english_counter_name_(query_name) {
+  }
+  DWORD getDWFormat() const;
+  const bool is_double_format_;
+  std::string pdh_english_counter_name_;
+};
+
+class PDHCounter : public PDHCounterBase {
+ public:
+  void addToJson(rapidjson::Value& body, rapidjson::Document::AllocatorType& 
alloc) const override;
+
+  PDH_STATUS addToQuery(PDH_HQUERY& pdh_query) override;
+  PDH_STATUS collectData() override;
+
+ protected:
+  friend PDHCounterBase* PDHCounterBase::createPDHCounter(const std::string& 
query_name, bool is_double);
+  explicit PDHCounter(const std::string& query_name, bool is_double) : 
PDHCounterBase(query_name, is_double),
+    counter_(), current_value_() {

Review comment:
       Is the empty initialization required for these types in the 
initialization list?

##########
File path: extensions/pdh/PerformanceDataCounter.h
##########
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <string>
+#include "rapidjson/document.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class PerformanceDataCounter {
+ public:
+  PerformanceDataCounter() = default;
+  virtual ~PerformanceDataCounter() {}

Review comment:
       This could use the `default` keyword as well.

##########
File path: extensions/pdh/PDHCounters.h
##########
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <TCHAR.h>
+#include <pdh.h>
+#include <pdhmsg.h>
+#include <string>
+
+#include "PerformanceDataCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+class PDHCounterBase : public PerformanceDataCounter {
+ public:
+  virtual ~PDHCounterBase() {}
+  static PDHCounterBase* createPDHCounter(const std::string& query_name, bool 
is_double = true);
+
+  const std::string& getName() const;
+  virtual std::string getObjectName() const;
+  virtual std::string getCounterName() const;
+  virtual PDH_STATUS addToQuery(PDH_HQUERY& pdh_query) = 0;
+  virtual PDH_STATUS collectData() = 0;
+ protected:
+  PDHCounterBase(const std::string& query_name, bool is_double) : 
PerformanceDataCounter(),
+    is_double_format_(is_double), pdh_english_counter_name_(query_name) {
+  }
+  DWORD getDWFormat() const;
+  const bool is_double_format_;
+  std::string pdh_english_counter_name_;
+};
+
+class PDHCounter : public PDHCounterBase {
+ public:
+  void addToJson(rapidjson::Value& body, rapidjson::Document::AllocatorType& 
alloc) const override;
+
+  PDH_STATUS addToQuery(PDH_HQUERY& pdh_query) override;
+  PDH_STATUS collectData() override;
+
+ protected:
+  friend PDHCounterBase* PDHCounterBase::createPDHCounter(const std::string& 
query_name, bool is_double);

Review comment:
       What's the reason for being this strict with the instantiation of the 
PDHCounters? Why is the instantiation restricted to the factory function only?

##########
File path: extensions/pdh/PDHCounters.cpp
##########
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PDHCounters.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+DWORD PDHCounterBase::getDWFormat() const {
+  return is_double_format_ ? PDH_FMT_DOUBLE : PDH_FMT_LARGE;
+}
+
+PDHCounterBase* PDHCounterBase::createPDHCounter(const std::string& 
query_name, bool is_double) {

Review comment:
       Could we use a unique_ptr for the return type and change the 
`resource_consumption_counters_` vector to be a collection of unique_ptrs 
instead?

##########
File path: extensions/pdh/PDHCounters.cpp
##########
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PDHCounters.h"
+#include "utils/StringUtils.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+DWORD PDHCounterBase::getDWFormat() const {
+  return is_double_format_ ? PDH_FMT_DOUBLE : PDH_FMT_LARGE;
+}
+
+PDHCounterBase* PDHCounterBase::createPDHCounter(const std::string& 
query_name, bool is_double) {
+  auto groups = utils::StringUtils::split(query_name, "\\");
+  if (groups.size() != 2 || query_name.substr(0, 1) != "\\")
+    return nullptr;
+  if (query_name.find("(*)") != std::string::npos) {
+    return new PDHCounterArray(query_name, is_double);
+  }  else {
+    return new PDHCounter(query_name, is_double);
+  }
+}
+
+const std::string& PDHCounterBase::getName() const {
+  return pdh_english_counter_name_;
+}
+
+std::string PDHCounterBase::getObjectName() const {
+  auto groups = utils::StringUtils::split(pdh_english_counter_name_, "\\");
+  return groups[0];
+}
+
+std::string PDHCounterBase::getCounterName() const {
+  auto groups = utils::StringUtils::split(pdh_english_counter_name_, "\\");
+  return groups[1];
+}
+
+void PDHCounter::addToJson(rapidjson::Value& body, 
rapidjson::Document::AllocatorType& alloc) const {
+  rapidjson::Value key(getCounterName().c_str(), getCounterName().length(), 
alloc);
+  rapidjson::Value& group_node = acquireNode(getObjectName(), body, alloc);
+  group_node.AddMember(key, getValue(), alloc);
+}
+
+PDH_STATUS PDHCounter::addToQuery(PDH_HQUERY& pdh_query)  {
+  return PdhAddEnglishCounter(pdh_query, pdh_english_counter_name_.c_str(), 
NULL, &counter_);
+}
+
+PDH_STATUS PDHCounter::collectData() {
+  return PdhGetFormattedCounterValue(counter_, getDWFormat(), NULL, 
&current_value_);
+}

Review comment:
       You could use `nullptr` instead of `NULL` here, and in other usages below

##########
File path: extensions/pdh/PerformanceDataMonitor.cpp
##########
@@ -0,0 +1,283 @@
+/**
+ * @file GenerateFlowFile.cpp
+ * GenerateFlowFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PerformanceDataMonitor.h"
+#include "PDHCounters.h"
+#include "MemoryConsumptionCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Relationship PerformanceDataMonitor::Success("success", "All files are 
routed to success");
+
+core::Property PerformanceDataMonitor::PredefinedGroups(
+    core::PropertyBuilder::createProperty("Predefined Groups")->
+    withDescription("Comma separated list to which predefined groups should be 
included")->
+    withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::CustomPDHCounters(
+    core::PropertyBuilder::createProperty("Custom PDH Counters")->
+    withDescription("Comma separated list of PDHCounters to collect from")->
+    withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::OutputFormatProperty(
+    core::PropertyBuilder::createProperty("Output format")->
+    withDescription("Format of the created flowfiles")->
+    withAllowableValue<std::string>(JSON_FORMAT_STR)->
+    withAllowableValue(OPEN_TELEMETRY_FORMAT_STR)->
+    withDefaultValue(JSON_FORMAT_STR)->build());
+
+PerformanceDataMonitor::~PerformanceDataMonitor() {
+  if (pdh_query_ != nullptr)
+    PdhCloseQuery(pdh_query_);
+  for (PerformanceDataCounter* resource_consumption_counter : 
resource_consumption_counters_) {
+    delete resource_consumption_counter;
+  }
+}
+
+void PerformanceDataMonitor::onSchedule(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+  setupMembersFromProperties(context);
+
+  std::vector<PerformanceDataCounter*> valid_counters;
+
+  for (PerformanceDataCounter* counter : resource_consumption_counters_) {
+    PDHCounterBase* pdh_counter = dynamic_cast<PDHCounterBase*> (counter);

Review comment:
       It seems like the elements of `resource_consumption_counters_` are cast 
to `PDHCounterBase*` in most places. Could the `resource_consumption_counters_` 
be changed to a vector of `PDHCounterBase` pointers instead of 
`PerformanceDataCounter` pointers?

##########
File path: extensions/pdh/PerformanceDataMonitor.h
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <pdh.h>
+#include <string>
+#include <vector>
+#include <memory>
+#include <utility>
+
+#include "core/Processor.h"
+
+#include "rapidjson/stream.h"
+#include "rapidjson/writer.h"
+#include "rapidjson/prettywriter.h"
+
+#include "PerformanceDataCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+// PerformanceDataMonitor Class
+class PerformanceDataMonitor : public core::Processor {
+ public:
+  static constexpr char const* JSON_FORMAT_STR = "JSON";
+  static constexpr char const* OPEN_TELEMETRY_FORMAT_STR = "OpenTelemetry";
+
+  explicit PerformanceDataMonitor(const std::string& name, utils::Identifier 
uuid = utils::Identifier())
+      : Processor(name, uuid), output_format_(OutputFormat::kJSON),
+      logger_(logging::LoggerFactory<PerformanceDataMonitor>::getLogger()),
+      pdh_query_(nullptr), resource_consumption_counters_() {
+  }
+  ~PerformanceDataMonitor() override;
+  static constexpr char const* ProcessorName = "PerformanceDataMonitor";
+  // Supported Properties
+  static core::Property PredefinedGroups;
+  static core::Property CustomPDHCounters;
+  static core::Property OutputFormatProperty;
+  // Supported Relationships
+  static core::Relationship Success;
+  // Nest Callback Class for write stream
+  class WriteCallback : public OutputStreamCallback {
+   public:
+    explicit WriteCallback(rapidjson::Document&& root) : 
root_(std::move(root)) {
+    }
+    rapidjson::Document root_;
+    int64_t process(const std::shared_ptr<io::BaseStream>& stream) {
+      rapidjson::StringBuffer buffer;
+      rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(buffer);
+      root_.Accept(writer);
+      return stream->write(reinterpret_cast<const 
uint8_t*>(buffer.GetString()), gsl::narrow<int>(buffer.GetSize()));
+    }
+  };
+
+ public:
+  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
+  void onTrigger(core::ProcessContext *context, core::ProcessSession *session) 
override;
+  void initialize(void) override;
+
+ protected:
+  enum class OutputFormat {
+    kJSON, kOpenTelemetry

Review comment:
       What does the `k` prefix stand for here?

##########
File path: extensions/pdh/PerformanceDataMonitor.cpp
##########
@@ -0,0 +1,283 @@
+/**
+ * @file GenerateFlowFile.cpp
+ * GenerateFlowFile class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PerformanceDataMonitor.h"
+#include "PDHCounters.h"
+#include "MemoryConsumptionCounter.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace processors {
+
+core::Relationship PerformanceDataMonitor::Success("success", "All files are 
routed to success");
+
+core::Property PerformanceDataMonitor::PredefinedGroups(
+    core::PropertyBuilder::createProperty("Predefined Groups")->
+    withDescription("Comma separated list to which predefined groups should be 
included")->
+    withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::CustomPDHCounters(
+    core::PropertyBuilder::createProperty("Custom PDH Counters")->
+    withDescription("Comma separated list of PDHCounters to collect from")->
+    withDefaultValue("")->build());
+
+core::Property PerformanceDataMonitor::OutputFormatProperty(
+    core::PropertyBuilder::createProperty("Output format")->
+    withDescription("Format of the created flowfiles")->
+    withAllowableValue<std::string>(JSON_FORMAT_STR)->
+    withAllowableValue(OPEN_TELEMETRY_FORMAT_STR)->
+    withDefaultValue(JSON_FORMAT_STR)->build());
+
+PerformanceDataMonitor::~PerformanceDataMonitor() {
+  if (pdh_query_ != nullptr)
+    PdhCloseQuery(pdh_query_);
+  for (PerformanceDataCounter* resource_consumption_counter : 
resource_consumption_counters_) {
+    delete resource_consumption_counter;
+  }
+}
+
+void PerformanceDataMonitor::onSchedule(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) {
+  setupMembersFromProperties(context);
+
+  std::vector<PerformanceDataCounter*> valid_counters;
+
+  for (PerformanceDataCounter* counter : resource_consumption_counters_) {
+    PDHCounterBase* pdh_counter = dynamic_cast<PDHCounterBase*> (counter);
+    if (pdh_counter != nullptr) {
+      if (pdh_query_ == nullptr)
+        PdhOpenQuery(NULL, NULL, &pdh_query_);
+      PDH_STATUS add_to_query_result = pdh_counter->addToQuery(pdh_query_);
+      if (add_to_query_result != ERROR_SUCCESS) {
+        logger_->log_error(("Error adding " + pdh_counter->getName() + " to 
query: " + std::to_string(add_to_query_result)).c_str());
+        delete counter;
+      } else {
+        valid_counters.push_back(counter);
+      }
+    } else {
+      valid_counters.push_back(counter);
+    }
+  }
+  resource_consumption_counters_ = valid_counters;
+  PdhCollectQueryData(pdh_query_);
+}
+
+void PerformanceDataMonitor::onTrigger(core::ProcessContext* context, 
core::ProcessSession* session) {
+  if (resource_consumption_counters_.empty()) {
+    logger_->log_error("No valid counters for PerformanceDataMonitor");
+    return;
+  }
+
+  std::shared_ptr<core::FlowFile> flowFile = session->create();
+  if (!flowFile) {
+    logger_->log_error("Failed to create flowfile!");
+    return;
+  }
+
+  if (pdh_query_ != nullptr)
+    PdhCollectQueryData(pdh_query_);
+
+  rapidjson::Document root = rapidjson::Document(rapidjson::kObjectType);
+  rapidjson::Value& body = prepareJSONBody(root);
+  for (PerformanceDataCounter* counter : resource_consumption_counters_) {
+    PDHCounterBase* pdh_counter = dynamic_cast<PDHCounterBase*>(counter);
+    if (pdh_counter != nullptr && pdh_counter->collectData() != ERROR_SUCCESS)
+      continue;
+    counter->addToJson(body, root.GetAllocator());
+  }
+  PerformanceDataMonitor::WriteCallback callback(std::move(root));
+  session->write(flowFile, &callback);
+  session->transfer(flowFile, Success);
+}
+
+void PerformanceDataMonitor::initialize(void) {
+  setSupportedProperties({ CustomPDHCounters, PredefinedGroups, 
OutputFormatProperty });
+  setSupportedRelationships({ PerformanceDataMonitor::Success });
+}
+
+rapidjson::Value& PerformanceDataMonitor::prepareJSONBody(rapidjson::Document& 
root) {
+  switch (output_format_) {
+    case (OutputFormat::kOpenTelemetry):

Review comment:
       I think the braces are unnecessary here.

##########
File path: extensions/pdh/tests/PerformanceDataMonitorTests.cpp
##########
@@ -0,0 +1,268 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+#include <string>
+#include <vector>
+#include <set>
+#include <fstream>
+
+#include "TestBase.h"
+#include "processors/PutFile.h"
+#include "utils/file/FileUtils.h"
+#include "PerformanceDataMonitor.h"
+#include "rapidjson/filereadstream.h"
+
+using PutFile = org::apache::nifi::minifi::processors::PutFile;
+using PerformanceDataMonitor = 
org::apache::nifi::minifi::processors::PerformanceDataMonitor;
+using PerformanceDataCounter = 
org::apache::nifi::minifi::processors::PerformanceDataCounter;
+
+class PerformanceDataMonitorTester {
+ public:
+  PerformanceDataMonitorTester() {
+    LogTestController::getInstance().setTrace<TestPlan>();
+    dir_ = test_controller_.createTempDirectory("/tmp/gt.XXXXXX");
+    plan_ = test_controller_.createPlan();
+    performance_monitor_ = plan_->addProcessor("PerformanceDataMonitor", 
"pdhsys");
+    putfile_ = plan_->addProcessor("PutFile", "putfile", 
core::Relationship("success", "description"), true);
+    plan_->setProperty(putfile_, PutFile::Directory.getName(), dir_);
+  }
+
+  void runProcessors() {
+    plan_->runNextProcessor();      // PerformanceMonitor
+    std::this_thread::sleep_for(std::chrono::milliseconds(200));
+    plan_->runCurrentProcessor();   // PerformanceMonitor
+    plan_->runNextProcessor();      // PutFile
+    plan_->runCurrentProcessor();   // PutFile
+  }
+
+  void setPerformanceMonitorProperty(core::Property property, const 
std::string& value) {
+    plan_->setProperty(performance_monitor_, property.getName(), value);
+  }
+
+  TestController test_controller_;
+  std::string dir_;
+  std::shared_ptr<TestPlan> plan_;
+  std::shared_ptr<core::Processor> performance_monitor_;
+  std::shared_ptr<core::Processor> putfile_;
+};
+
+
+TEST_CASE("PerformanceDataMonitorEmptyPropertiesTest", 
"[performancedatamonitoremptypropertiestest]") {
+  PerformanceDataMonitorTester tester;
+  tester.runProcessors();
+
+  REQUIRE(tester.test_controller_.getLog().getInstance().contains("No valid 
counters for PerformanceDataMonitor", std::chrono::seconds(0)));
+
+  uint32_t number_of_flowfiles = 0;
+  auto lambda = [&number_of_flowfiles](const std::string& path, const 
std::string& filename) -> bool {
+    ++number_of_flowfiles;
+    return true;
+  };
+
+  utils::file::FileUtils::list_dir(tester.dir_, lambda, 
tester.plan_->getLogger(), false);
+  REQUIRE(number_of_flowfiles == 0);
+}
+
+TEST_CASE("PerformanceDataMonitorPartiallyInvalidGroupPropertyTest", 
"[performancedatamonitorpartiallyinvalidgrouppropertytest]") {
+  PerformanceDataMonitorTester tester;
+  
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::PredefinedGroups, 
"Disk,CPU,Asd");
+  
tester.setPerformanceMonitorProperty(PerformanceDataMonitor::CustomPDHCounters, 
"\\Invalid\\Counter,\\System\\Processes");
+  tester.runProcessors();
+
+  REQUIRE(tester.test_controller_.getLog().getInstance().contains("Asd is not 
a valid predefined group", std::chrono::seconds(0)));
+  REQUIRE(tester.test_controller_.getLog().getInstance().contains("Error 
adding \\Invalid\\Counter to query", std::chrono::seconds(0)));
+
+  uint32_t number_of_flowfiles = 0;
+
+  auto lambda = [&number_of_flowfiles](const std::string& path, const 
std::string& filename) -> bool {
+    ++number_of_flowfiles;
+    FILE* fp = fopen((path + utils::file::FileUtils::get_separator() + 
filename).c_str(), "r");
+    REQUIRE(fp != nullptr);
+    char readBuffer[500];
+    rapidjson::FileReadStream is(fp, readBuffer, sizeof(readBuffer));
+    rapidjson::Document d;

Review comment:
       Minor, but I'd avoid the 1 character variable names even in tests for 
easier search/replace purposes later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to