adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041281303


##########
extensions/mqtt/processors/ConsumeMQTT.cpp:
##########
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, 
MQTTMessageDeleter> message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-    logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+    logger_->log_error("MQTT queue full");
     return;
   }
 
-  if (gsl::narrow<uint64_t>(message->payloadlen) > max_seg_size_) {
-    logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
-    message->payloadlen = gsl::narrow<int>(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr<core::ProcessContext> 
&context, const std::shared_ptr<core::ProcessSessionFactory> &factory) {
+void ConsumeMQTT::readProperties(const std::shared_ptr<core::ProcessContext>& 
context) {
+  if (auto value = context->getProperty(Topic)) {
+    topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty<bool>(CleanSession)) {
-    cleanSession_ = *value;
-    logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+    clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty<bool>(CleanStart)) {
+    clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = 
context->getProperty<core::TimePeriodValue>(SessionExpiryInterval)) {
+    session_expiry_interval_ = 
std::chrono::duration_cast<std::chrono::seconds>(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", 
int64_t{session_expiry_interval_.count()});
 
   if (const auto value = 
context->getProperty<uint64_t>(QueueBufferMaxMessage)) {
-    maxQueueSize_ = *value;
-    logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
+    max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read 
before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+    attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", 
attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr<core::ProcessContext>& 
/*context*/, const std::shared_ptr<core::ProcessSession> &session) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = 
context->getProperty<uint32_t>(TopicAliasMaximum)) {
+    topic_alias_maximum_ = gsl::narrow<uint16_t>(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", 
topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-    logger_->log_error("Could not consume from MQTT broker because 
disconnected to %s", uri_);
-    yield();
-    return;
+  if (const auto receive_maximum = 
context->getProperty<uint32_t>(ReceiveMaximum)) {
+    receive_maximum_ = gsl::narrow<uint16_t>(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", 
receive_maximum_);
+}
 
-  std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr<core::ProcessContext>& 
/*context*/, const std::shared_ptr<core::ProcessSession>& session) {
+  std::queue<SmartMessage> msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
     const auto& message = msg_queue.front();
-    std::shared_ptr<core::FlowFile> processFlowFile = session->create();
-    int write_status{};
-    session->write(processFlowFile, [&message, &write_status](const 
std::shared_ptr<io::OutputStream>& stream) -> int64_t {
-      if (message->payloadlen < 0) {
-        write_status = -1;
-        return -1;
-      }
-      const auto len = 
stream->write(reinterpret_cast<uint8_t*>(message->payload), 
gsl::narrow<size_t>(message->payloadlen));
-      if (io::isError(len)) {
-        write_status = -1;
-        return -1;
-      }
-      return gsl::narrow<int64_t>(len);
-    });
-    if (write_status < 0) {
-      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
-      session->remove(processFlowFile);
+    std::shared_ptr<core::FlowFile> flow_file = session->create();
+    WriteCallback write_callback(message, logger_);
+    try {
+      session->write(flow_file, write_callback);
+    } catch (const Exception& ex) {
+      logger_->log_error("Error when processing message queue: %s", ex.what());
+    }
+    if (!write_callback.getSuccessStatus()) {
+      logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
flow_file->getUUIDStr());
+      session->remove(flow_file);
     } else {
-      session->putAttribute(processFlowFile, MQTT_BROKER_ATTRIBUTE, uri_);
-      session->putAttribute(processFlowFile, MQTT_TOPIC_ATTRIBUTE, topic_);
-      logger_->log_debug("ConsumeMQTT processing success for the flow with 
UUID %s topic %s", processFlowFile->getUUIDStr(), topic_);
-      session->transfer(processFlowFile, Success);
+      putUserPropertiesAsAttributes(message, flow_file, session);
+      session->putAttribute(flow_file, MQTT_BROKER_ATTRIBUTE, uri_);
+      session->putAttribute(flow_file, MQTT_TOPIC_ATTRIBUTE, message.topic);
+      fillAttributeFromContentType(message, flow_file, session);
+      logger_->log_debug("ConsumeMQTT processing success for the flow with 
UUID %s topic %s", flow_file->getUUIDStr(), message.topic);
+      session->transfer(flow_file, Success);
     }
-    msg_queue.pop_front();
+    msg_queue.pop();
+  }
+}
+
+std::queue<ConsumeMQTT::SmartMessage> ConsumeMQTT::getReceivedMqttMessages() {
+  std::queue<SmartMessage> msg_queue;
+  SmartMessage message;
+  while (queue_.try_dequeue(message)) {
+    msg_queue.push(std::move(message));
+  }
+  return msg_queue;
+}
+
+int64_t ConsumeMQTT::WriteCallback::operator() (const 
std::shared_ptr<io::OutputStream>& stream) {
+  if (message_.contents->payloadlen < 0) {
+    success_status_ = false;
+    logger_->log_error("Payload length of message is negative, value is [%d]", 
message_.contents->payloadlen);
+    return -1;
+  }
+
+  const auto len = 
stream->write(reinterpret_cast<uint8_t*>(message_.contents->payload), 
gsl::narrow<size_t>(message_.contents->payloadlen));
+  if (io::isError(len)) {
+    success_status_ = false;
+    logger_->log_error("Stream writing error when processing message");
+    return -1;
+  }
+
+  return len;
+}
+
+void ConsumeMQTT::putUserPropertiesAsAttributes(const SmartMessage& message, 
const std::shared_ptr<core::FlowFile>& flow_file, const 
std::shared_ptr<core::ProcessSession>& session) const {
+  if (mqtt_version_.value() != MqttVersions::V_5_0) {
+    return;
   }
+
+  const auto property_count = 
MQTTProperties_propertyCount(&message.contents->properties, 
MQTTPROPERTY_CODE_USER_PROPERTY);
+  for (int i=0; i < property_count; ++i) {
+    MQTTProperty* property = 
MQTTProperties_getPropertyAt(&message.contents->properties, 
MQTTPROPERTY_CODE_USER_PROPERTY, i);
+    std::string key(property->value.data.data, property->value.data.len);
+    std::string value(property->value.value.data, property->value.value.len);
+    session->putAttribute(flow_file, key, value);
+  }
+}
+
+void ConsumeMQTT::fillAttributeFromContentType(const SmartMessage& message, 
const std::shared_ptr<core::FlowFile>& flow_file, const 
std::shared_ptr<core::ProcessSession>& session) const {
+  if (mqtt_version_.value() != MqttVersions::V_5_0 || 
attribute_from_content_type_.empty()) {
+    return;
+  }
+
+  MQTTProperty* property = 
MQTTProperties_getProperty(&message.contents->properties, 
MQTTPROPERTY_CODE_CONTENT_TYPE);
+  if (property == nullptr) {
+    return;
+  }
+
+  std::string content_type(property->value.data.data, 
property->value.data.len);
+  session->putAttribute(flow_file, attribute_from_content_type_, content_type);
 }
 
-bool ConsumeMQTT::startupClient() {
+void ConsumeMQTT::startupClient() {
   MQTTAsync_responseOptions response_options = 
MQTTAsync_responseOptions_initializer;
   response_options.context = this;
-  response_options.onSuccess = subscriptionSuccess;
-  response_options.onFailure = subscriptionFailure;
-  const int ret = MQTTAsync_subscribe(client_, topic_.c_str(), 
gsl::narrow<int>(qos_), &response_options);
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+    response_options.onSuccess5 = subscriptionSuccess5;
+    response_options.onFailure5 = subscriptionFailure5;
+  } else {
+    response_options.onSuccess = subscriptionSuccess;
+    response_options.onFailure = subscriptionFailure;
+  }
+
+  const int ret = MQTTAsync_subscribe(client_, topic_.c_str(), 
gsl::narrow<int>(qos_.value()), &response_options);
   if (ret != MQTTASYNC_SUCCESS) {
     logger_->log_error("Failed to subscribe to MQTT topic %s (%d)", topic_, 
ret);
-    return false;
+    return;
   }
   logger_->log_debug("Successfully subscribed to MQTT topic: %s", topic_);
-  return true;
 }
 
-void ConsumeMQTT::onMessageReceived(char* topic_name, int /*topic_len*/, 
MQTTAsync_message* message) {
-  MQTTAsync_free(topic_name);
+void ConsumeMQTT::onMessageReceived(SmartMessage smart_message) {
+  if (mqtt_version_ == MqttVersions::V_5_0) {
+    resolveTopicFromAlias(smart_message);
+  }
+
+  if (smart_message.topic.empty()) {
+    logger_->log_error("Received message without topic");
+    return;
+  }
+
+  enqueueReceivedMQTTMsg(std::move(smart_message));
+}
+
+void ConsumeMQTT::resolveTopicFromAlias(SmartMessage& smart_message) {
+  auto raw_alias = 
MQTTProperties_getNumericValue(&smart_message.contents->properties, 
MQTTPROPERTY_CODE_TOPIC_ALIAS);
+
+  std::optional<uint16_t> alias;
+  if (raw_alias != PAHO_MQTT_C_FAILURE_CODE) {
+    alias = gsl::narrow<uint16_t>(raw_alias);
+  }
+
+  auto& topic = smart_message.topic;
 
-  const auto* msgPayload = reinterpret_cast<const char*>(message->payload);
-  const size_t msgLen = message->payloadlen;
-  const std::string messageText(msgPayload, msgLen);
-  logger_->log_debug("Received message \"%s\" to MQTT topic %s on broker %s", 
messageText, topic_, uri_);
+  if (alias.has_value()) {
+    if (*alias > topic_alias_maximum_) {
+      logger_->log_error("Broker does not respect client's Topic Alias 
Maximum, sent a greater value: %" PRIu16 " > %" PRIu16, *alias, 
topic_alias_maximum_);
+      return;
+    }
 
-  std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> smartMessage(message);
-  enqueueReceivedMQTTMsg(std::move(smartMessage));
+    // if topic is empty, this is just a usage of a previously stored alias 
(look it up), otherwise a new one (store it)
+    if (topic.empty()) {
+      const auto iter = alias_to_topic_.find(*alias);
+      if (iter == alias_to_topic_.end()) {
+        logger_->log_error("Broker sent an alias that was not known to client 
before: %" PRIu16, *alias);
+      } else {
+        topic = iter->second;
+      }
+    } else {
+      alias_to_topic_[*alias] = topic;
+    }
+  } else if (topic.empty()) {
+    logger_->log_error("Received message without topic and alias");
+  }
 }
 
 void ConsumeMQTT::checkProperties() {
-  if (!cleanSession_ && clientID_.empty()) {
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a Client 
ID for durable (non-clean) sessions");
+  if (mqtt_version_ == MqttVersions::V_3_1_0 || mqtt_version_ == 
MqttVersions::V_3_1_1 || mqtt_version_ == MqttVersions::V_3X_AUTO) {
+    if (isPropertyExplicitlySet(CleanStart)) {
+      logger_->log_warn("MQTT 3.x specification does not support Clean Start. 
Property is not used.");
+    }
+    if (isPropertyExplicitlySet(SessionExpiryInterval)) {
+      logger_->log_warn("MQTT 3.x specification does not support Session 
Expiry Intervals. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(AttributeFromContentType)) {
+      logger_->log_warn("MQTT 3.x specification does not support Content Types 
and thus attributes cannot be created from them. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(TopicAliasMaximum)) {
+      logger_->log_warn("MQTT 3.x specification does not support Topic Alias 
Maximum. Property is not used.");
+    }
+    if (isPropertyExplicitlySet(ReceiveMaximum)) {
+      logger_->log_warn("MQTT 3.x specification does not support Receive 
Maximum. Property is not used.");
+    }
+  }
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0 && 
isPropertyExplicitlySet(CleanSession)) {
+    logger_->log_warn("MQTT 5.0 specification does not support Clean Session. 
Property is not used.");
+  }
+
+  if (clientID_.empty()) {
+    if (mqtt_version_.value() == MqttVersions::V_5_0) {
+      if (session_expiry_interval_ > std::chrono::seconds(0)) {
+        throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a 
Client ID for durable (Session Expiry Interval > 0) sessions");
+      }
+    } else if (!clean_session_) {
+      throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Processor must have a 
Client ID for durable (non-clean) sessions");
+    }
+  }
+
+  if (qos_ == MqttQoS::LEVEL_0) {
+    if (mqtt_version_.value() == MqttVersions::V_5_0) {
+      if (session_expiry_interval_ > std::chrono::seconds(0)) {
+        logger_->log_warn("Messages are not preserved during client 
disconnection "
+                          "by the broker when QoS is less than 1 for durable 
(Session Expiry Interval > 0) sessions. Only subscriptions are preserved.");
+      }
+    } else if (!clean_session_) {
+      logger_->log_warn("Messages are not preserved during client 
disconnection "
+                        "by the broker when QoS is less than 1 for durable 
(non-clean) sessions. Only subscriptions are preserved.");
+    }
+  }
+}
+
+void ConsumeMQTT::checkBrokerLimitsImpl() {
+  auto hasWildcards = [] (std::string_view topic) {
+    return std::any_of(topic.begin(), topic.end(), [] (const char ch) {return 
ch == '+' || ch == '#';});
+  };
+
+  if (wildcard_subscription_available_ == false && hasWildcards(topic_)) {
+    std::ostringstream os;
+    os << "Broker does not support wildcards but topic \"" << topic_ <<"\" has 
them";
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, os.str());
+  }
+
+  if (maximum_session_expiry_interval_.has_value() && session_expiry_interval_ 
> maximum_session_expiry_interval_) {
+    std::ostringstream os;
+    os << "Set Session Expiry Interval (" << session_expiry_interval_.count() 
<<" s) is longer then maximum supported by broker (" << 
maximum_session_expiry_interval_->count() << " s).";

Review Comment:
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to