adam-markovics commented on code in PR #1363:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1363#discussion_r925533498


##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -37,48 +34,64 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-bool ConsumeMQTT::enqueueReceiveMQTTMsg(MQTTClient_message *message) {
+void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, 
MQTTMessageDeleter> message) {
   if (queue_.size_approx() >= maxQueueSize_) {
     logger_->log_warn("MQTT queue full");
-    return false;
-  } else {
-    if (gsl::narrow<uint64_t>(message->payloadlen) > maxSegSize_)
-      message->payloadlen = maxSegSize_;
-    queue_.enqueue(message);
-    logger_->log_debug("enqueue MQTT message length %d", message->payloadlen);
-    return true;
+    return;
+  }
+
+  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
+    logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
+    message->payloadlen = gsl::narrow<int>(max_seg_size_);
   }
+
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  queue_.enqueue(std::move(message));
 }
 
 void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
-  AbstractMQTTProcessor::onSchedule(context, factory);
   std::string value;
   int64_t valInt;
   value = "";
+  if (context->getProperty(CleanSession.getName(), value)) {
+    cleanSession_ = utils::StringUtils::toBool(value).value_or(cleanSession_);
+    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+  }
+  value = "";
   if (context->getProperty(QueueBufferMaxMessage.getName(), value) && 
!value.empty() && core::Property::StringToInt(value, valInt)) {
     maxQueueSize_ = valInt;
     logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
   }
-  value = "";
-  if (context->getProperty(MaxFlowSegSize.getName(), value) && !value.empty() 
&& core::Property::StringToInt(value, valInt)) {
-    maxSegSize_ = valInt;
-    logger_->log_debug("ConsumeMQTT: Max Flow Segment Size [%" PRIu64 "]", 
maxSegSize_);
+
+  AbstractMQTTProcessor::onSchedule(context, factory);

Review Comment:
   We could extend parent's `onSchedule` with `CleanSession`, but not with 
`QueueBufferMaxMessage`. The former is used during connection, and that's okay. 
But the latter is specific to this processor, not in its sibling class 
`PublishMQTT`. It is used used asynchronously when receiving messages, and that 
could be right after connection, so its value has to be read before that. I 
added a comment for now.



##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -15,147 +15,167 @@
  * limitations under the License.
  */
 #include "AbstractMQTTProcessor.h"
-#include <cstdio>
 #include <memory>
 #include <string>
-#include <cinttypes>
-#include <vector>
+#include <utility>
 
-#include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
-#include "core/ProcessSession.h"
 
 namespace org::apache::nifi::minifi::processors {
 
 void AbstractMQTTProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory>& /*factory*/) {
-  sslEnabled_ = false;
-  sslopts_ = MQTTClient_SSLOptions_initializer;
-
-  std::string value;
-  int64_t valInt;
-  value = "";
-  if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) {
-    uri_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: BrokerURL [%s]", uri_);
-  }
-  value = "";
-  if (context->getProperty(ClientID.getName(), value) && !value.empty()) {
-    clientID_ = value;
+  if (auto value = context->getProperty(BrokerURI)) {
+    uri_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: BrokerURI [%s]", uri_);
+  }
+  if (auto value = context->getProperty(ClientID)) {
+    clientID_ = std::move(*value);
     logger_->log_debug("AbstractMQTTProcessor: ClientID [%s]", clientID_);
   }
-  value = "";
-  if (context->getProperty(Topic.getName(), value) && !value.empty()) {
-    topic_ = value;
+  if (auto value = context->getProperty(Topic)) {
+    topic_ = std::move(*value);
     logger_->log_debug("AbstractMQTTProcessor: Topic [%s]", topic_);
   }
-  value = "";
-  if (context->getProperty(UserName.getName(), value) && !value.empty()) {
-    userName_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", userName_);
+  if (auto value = context->getProperty(Username)) {
+    username_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: UserName [%s]", username_);
   }
-  value = "";
-  if (context->getProperty(PassWord.getName(), value) && !value.empty()) {
-    passWord_ = value;
-    logger_->log_debug("AbstractMQTTProcessor: PassWord [%s]", passWord_);
+  if (auto value = context->getProperty(Password)) {
+    password_ = std::move(*value);
+    logger_->log_debug("AbstractMQTTProcessor: Password [%s]", password_);
   }
 
-  const auto cleanSession_parsed = [&] () -> std::optional<bool> {
-    std::string property_value;
-    if (!context->getProperty(CleanSession.getName(), property_value)) return 
std::nullopt;
-    return utils::StringUtils::toBool(property_value);
-  }();
-  if ( cleanSession_parsed ) {
-    cleanSession_ = *cleanSession_parsed;
-    logger_->log_debug("AbstractMQTTProcessor: CleanSession [%d]", 
cleanSession_);
+  if (const auto keep_alive_interval = 
context->getProperty<core::TimePeriodValue>(KeepAliveInterval)) {
+    keep_alive_interval_ = 
std::chrono::duration_cast<std::chrono::seconds>(keep_alive_interval->getMilliseconds());
+    logger_->log_debug("AbstractMQTTProcessor: KeepAliveInterval [%" PRId64 "] 
s", int64_t{keep_alive_interval_.count()});
   }
 
-  if (auto keep_alive_interval = 
context->getProperty<core::TimePeriodValue>(KeepLiveInterval)) {
-    keepAliveInterval_ = keep_alive_interval->getMilliseconds();
-    logger_->log_debug("AbstractMQTTProcessor: KeepLiveInterval [%" PRId64 "] 
ms", int64_t{keepAliveInterval_.count()});
+  if (const auto value = context->getProperty<uint64_t>(MaxFlowSegSize)) {
+    max_seg_size_ = {*value};
+    logger_->log_debug("PublishMQTT: max flow segment size [%" PRIu64 "]", 
max_seg_size_);
   }
 
-  if (auto connection_timeout = 
context->getProperty<core::TimePeriodValue>(ConnectionTimeout)) {
-    connectionTimeout_ = connection_timeout->getMilliseconds();
-    logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "] 
ms", int64_t{connectionTimeout_.count()});
+  if (const auto connection_timeout = 
context->getProperty<core::TimePeriodValue>(ConnectionTimeout)) {
+    connection_timeout_ = 
std::chrono::duration_cast<std::chrono::seconds>(connection_timeout->getMilliseconds());
+    logger_->log_debug("AbstractMQTTProcessor: ConnectionTimeout [%" PRId64 "] 
s", int64_t{connection_timeout_.count()});
   }
 
-  value = "";
-  if (context->getProperty(QOS.getName(), value) && !value.empty() && (value 
== MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) &&
-      core::Property::StringToInt(value, valInt)) {
-    qos_ = valInt;
-    logger_->log_debug("AbstractMQTTProcessor: QOS [%" PRId64 "]", qos_);
+  if (const auto value = context->getProperty<uint32_t>(QoS); value && (*value 
== MQTT_QOS_0 || *value == MQTT_QOS_1 || *value == MQTT_QOS_2)) {
+    qos_ = {*value};
+    logger_->log_debug("AbstractMQTTProcessor: QoS [%" PRIu32 "]", qos_);
   }
-  value = "";
 
-  if (context->getProperty(SecurityProtocol.getName(), value) && 
!value.empty()) {
-    if (value == MQTT_SECURITY_PROTOCOL_SSL) {
-      sslEnabled_ = true;
-      value = "";
-      if (context->getProperty(SecurityCA.getName(), value) && !value.empty()) 
{
-        logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", value);
-        securityCA_ = value;
-        sslopts_.trustStore = securityCA_.c_str();
+  if (const auto security_protocol = context->getProperty(SecurityProtocol)) {
+    if (*security_protocol == MQTT_SECURITY_PROTOCOL_SSL) {
+      sslOpts_ = MQTTAsync_SSLOptions_initializer;
+      if (auto value = context->getProperty(SecurityCA)) {
+        logger_->log_debug("AbstractMQTTProcessor: trustStore [%s]", *value);
+        securityCA_ = std::move(*value);
+        sslOpts_->trustStore = securityCA_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityCert.getName(), value) && 
!value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", value);
-        securityCert_ = value;
-        sslopts_.keyStore = securityCert_.c_str();
+      if (auto value = context->getProperty(SecurityCert)) {
+        logger_->log_debug("AbstractMQTTProcessor: keyStore [%s]", *value);
+        securityCert_ = std::move(*value);
+        sslOpts_->keyStore = securityCert_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityPrivateKey.getName(), value) && 
!value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", value);
-        securityPrivateKey_ = value;
-        sslopts_.privateKey = securityPrivateKey_.c_str();
+      if (auto value = context->getProperty(SecurityPrivateKey)) {
+        logger_->log_debug("AbstractMQTTProcessor: privateKey [%s]", *value);
+        securityPrivateKey_ = std::move(*value);
+        sslOpts_->privateKey = securityPrivateKey_.c_str();
       }
-      value = "";
-      if (context->getProperty(SecurityPrivateKeyPassWord.getName(), value) && 
!value.empty()) {
-        logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", 
value);
-        securityPrivateKeyPassWord_ = value;
-        sslopts_.privateKeyPassword = securityPrivateKeyPassWord_.c_str();
+      if (auto value = context->getProperty(SecurityPrivateKeyPassword)) {
+        logger_->log_debug("AbstractMQTTProcessor: privateKeyPassword [%s]", 
*value);
+        securityPrivateKeyPassword_ = std::move(*value);
+        sslOpts_->privateKeyPassword = securityPrivateKeyPassword_.c_str();
       }
     }
   }
+
+  if (auto last_will_topic = context->getProperty(LastWillTopic); 
last_will_topic.has_value() && !last_will_topic->empty()) {
+    last_will_ = MQTTAsync_willOptions_initializer;
+
+    logger_->log_debug("AbstractMQTTProcessor: Last Will Topic [%s]", 
*last_will_topic);
+    last_will_topic_ = std::move(*last_will_topic);
+    last_will_->topicName = last_will_topic_.c_str();
+
+    if (auto value = context->getProperty(LastWillMessage)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will Message [%s]", 
*value);
+      last_will_message_ = std::move(*value);
+      last_will_->message = last_will_message_.c_str();
+    }
+
+    if (const auto value = context->getProperty<uint32_t>(LastWillQoS); value 
&& (*value == MQTT_QOS_0 || *value == MQTT_QOS_1 || *value == MQTT_QOS_2)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will QoS [%" PRIu32 "]", 
*value);
+      last_will_qos_ = {*value};
+      last_will_->qos = gsl::narrow<int>(last_will_qos_);
+    }
+
+    if (const auto value = context->getProperty<bool>(LastWillRetain)) {
+      logger_->log_debug("AbstractMQTTProcessor: Last Will Retain [%d]", 
*value);
+      last_will_retain_ = {*value};
+      last_will_->retained = last_will_retain_;
+    }
+  }
+
   if (!client_) {
-    MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), 
MQTTCLIENT_PERSISTENCE_NONE, nullptr);
+    if (MQTTAsync_create(&client_, uri_.c_str(), clientID_.c_str(), 
MQTTCLIENT_PERSISTENCE_NONE, nullptr) != MQTTASYNC_SUCCESS) {
+      logger_->log_error("Creating MQTT client failed");
+    }
   }
   if (client_) {
-    MQTTClient_setCallbacks(client_, this, connectionLost, msgReceived, 
msgDelivered);
+    MQTTAsync_setCallbacks(client_, this, connectionLost, msgReceived, 
nullptr);

Review Comment:
   Check added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to