[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1071202236 ## extensions/mqtt/processors/AbstractMQTTProcessor.cpp: ## @@ -137,49 +148,317 @@ void AbstractMQTTProcessor::onSchedule(const std::shared_ptrlog_error("MQTT client is not existing while trying to reconnect"); +throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT client is not existing while trying to reconnect"); + } + if (MQTTAsync_isConnected(client_)) { +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); return; } + + MQTTAsync_connectOptions connect_options; + MQTTProperties connect_properties = MQTTProperties_initializer; + MQTTProperties will_properties = MQTTProperties_initializer; + + ConnectFinishedTask connect_finished_task( + [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) { +onConnectFinished(success_data, success_data_5, failure_data, failure_data_5); + }); + + setConnectOptions(connect_options, connect_properties, will_properties, connect_finished_task); + + logger_->log_info("Reconnecting to %s", uri_); if (MQTTAsync_isConnected(client_)) { -logger_->log_info("Already connected to %s, no need to reconnect", uri_); +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); +return; + } + const int ret = MQTTAsync_connect(client_, _options); + MQTTProperties_free(_properties); + if (ret != MQTTASYNC_SUCCESS) { +logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret); return; } - MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; - conn_opts.keepAliveInterval = gsl::narrow(keep_alive_interval_.count()); - conn_opts.cleansession = getCleanSession(); - conn_opts.context = this; - conn_opts.onSuccess = connectionSuccess; - conn_opts.onFailure = connectionFailure; - conn_opts.connectTimeout = gsl::narrow(connection_timeout_.count()); + + // wait until connection succeeds or fails + connect_finished_task.get_future().get(); +} + +void AbstractMQTTProcessor::setConnectOptions(MQTTAsync_connectOptions& connect_options, MQTTProperties& connect_properties, + MQTTProperties& will_properties, const ConnectFinishedTask& connect_finished_task) const { + if (mqtt_version_.value() == MqttVersions::V_5_0) { +setMqtt5ConnectOptions(connect_options, connect_properties, will_properties); + } else { +setMqtt3ConnectOptions(connect_options); + } + + connect_options.context = const_cast(_finished_task); + connect_options.connectTimeout = gsl::narrow(connection_timeout_.count()); + connect_options.keepAliveInterval = gsl::narrow(keep_alive_interval_.count()); if (!username_.empty()) { -conn_opts.username = username_.c_str(); -conn_opts.password = password_.c_str(); +connect_options.username = username_.c_str(); +connect_options.password = password_.c_str(); } if (sslOpts_) { -conn_opts.ssl = &*sslOpts_; +connect_options.ssl = const_cast(&*sslOpts_); } if (last_will_) { -conn_opts.will = &*last_will_; +connect_options.will = const_cast(&*last_will_); } +} - logger_->log_info("Reconnecting to %s", uri_); - int ret = MQTTAsync_connect(client_, _opts); - if (ret != MQTTASYNC_SUCCESS) { -logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret); +void AbstractMQTTProcessor::setMqtt3ConnectOptions(MQTTAsync_connectOptions& connect_options) const { + connect_options = MQTTAsync_connectOptions_initializer; + connect_options.onSuccess = connectionSuccess; + connect_options.onFailure = connectionFailure; + connect_options.cleansession = getCleanSession(); + + if (mqtt_version_.value() == MqttVersions::V_3_1_0) { +connect_options.MQTTVersion = MQTTVERSION_3_1; + } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) { +connect_options.MQTTVersion = MQTTVERSION_3_1_1; + } +} + +void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& connect_options, MQTTProperties& connect_properties, MQTTProperties& will_properties) const { + connect_options = MQTTAsync_connectOptions_initializer5; + connect_options.onSuccess5 = connectionSuccess5; + connect_options.onFailure5 = connectionFailure5; + connect_options.connectProperties = _properties; + + connect_options.cleanstart = getCleanStart(); + + { +MQTTProperty property; +property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL; +property.value.integer4 = gsl::narrow(getSessionExpiryInterval().count()); Review Comment: That does not, but MQTT 5 specs does: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048 "Followed by the Four Byte Integer representing the Session
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1071202236 ## extensions/mqtt/processors/AbstractMQTTProcessor.cpp: ## @@ -137,49 +148,317 @@ void AbstractMQTTProcessor::onSchedule(const std::shared_ptrlog_error("MQTT client is not existing while trying to reconnect"); +throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT client is not existing while trying to reconnect"); + } + if (MQTTAsync_isConnected(client_)) { +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); return; } + + MQTTAsync_connectOptions connect_options; + MQTTProperties connect_properties = MQTTProperties_initializer; + MQTTProperties will_properties = MQTTProperties_initializer; + + ConnectFinishedTask connect_finished_task( + [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) { +onConnectFinished(success_data, success_data_5, failure_data, failure_data_5); + }); + + setConnectOptions(connect_options, connect_properties, will_properties, connect_finished_task); + + logger_->log_info("Reconnecting to %s", uri_); if (MQTTAsync_isConnected(client_)) { -logger_->log_info("Already connected to %s, no need to reconnect", uri_); +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); +return; + } + const int ret = MQTTAsync_connect(client_, _options); + MQTTProperties_free(_properties); + if (ret != MQTTASYNC_SUCCESS) { +logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret); return; } - MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; - conn_opts.keepAliveInterval = gsl::narrow(keep_alive_interval_.count()); - conn_opts.cleansession = getCleanSession(); - conn_opts.context = this; - conn_opts.onSuccess = connectionSuccess; - conn_opts.onFailure = connectionFailure; - conn_opts.connectTimeout = gsl::narrow(connection_timeout_.count()); + + // wait until connection succeeds or fails + connect_finished_task.get_future().get(); +} + +void AbstractMQTTProcessor::setConnectOptions(MQTTAsync_connectOptions& connect_options, MQTTProperties& connect_properties, + MQTTProperties& will_properties, const ConnectFinishedTask& connect_finished_task) const { + if (mqtt_version_.value() == MqttVersions::V_5_0) { +setMqtt5ConnectOptions(connect_options, connect_properties, will_properties); + } else { +setMqtt3ConnectOptions(connect_options); + } + + connect_options.context = const_cast(_finished_task); + connect_options.connectTimeout = gsl::narrow(connection_timeout_.count()); + connect_options.keepAliveInterval = gsl::narrow(keep_alive_interval_.count()); if (!username_.empty()) { -conn_opts.username = username_.c_str(); -conn_opts.password = password_.c_str(); +connect_options.username = username_.c_str(); +connect_options.password = password_.c_str(); } if (sslOpts_) { -conn_opts.ssl = &*sslOpts_; +connect_options.ssl = const_cast(&*sslOpts_); } if (last_will_) { -conn_opts.will = &*last_will_; +connect_options.will = const_cast(&*last_will_); } +} - logger_->log_info("Reconnecting to %s", uri_); - int ret = MQTTAsync_connect(client_, _opts); - if (ret != MQTTASYNC_SUCCESS) { -logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret); +void AbstractMQTTProcessor::setMqtt3ConnectOptions(MQTTAsync_connectOptions& connect_options) const { + connect_options = MQTTAsync_connectOptions_initializer; + connect_options.onSuccess = connectionSuccess; + connect_options.onFailure = connectionFailure; + connect_options.cleansession = getCleanSession(); + + if (mqtt_version_.value() == MqttVersions::V_3_1_0) { +connect_options.MQTTVersion = MQTTVERSION_3_1; + } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) { +connect_options.MQTTVersion = MQTTVERSION_3_1_1; + } +} + +void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& connect_options, MQTTProperties& connect_properties, MQTTProperties& will_properties) const { + connect_options = MQTTAsync_connectOptions_initializer5; + connect_options.onSuccess5 = connectionSuccess5; + connect_options.onFailure5 = connectionFailure5; + connect_options.connectProperties = _properties; + + connect_options.cleanstart = getCleanStart(); + + { +MQTTProperty property; +property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL; +property.value.integer4 = gsl::narrow(getSessionExpiryInterval().count()); Review Comment: That does not, but MQTT 5 specs does: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048 "Followed by the Four Byte Integer representing the Session
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1042215060 ## extensions/mqtt/processors/AbstractMQTTProcessor.h: ## @@ -46,150 +44,178 @@ class AbstractMQTTProcessor : public core::Processor { freeResources(); } + SMART_ENUM(MqttVersions, +(V_3X_AUTO, "3.x AUTO"), +(V_3_1_0, "3.1.0"), +(V_3_1_1, "3.1.1"), +(V_5_0, "5.0")); + + SMART_ENUM(MqttQoS, +(LEVEL_0, "0"), +(LEVEL_1, "1"), +(LEVEL_2, "2")); + EXTENSIONAPI static const core::Property BrokerURI; EXTENSIONAPI static const core::Property ClientID; + EXTENSIONAPI static const core::Property QoS; + EXTENSIONAPI static const core::Property MqttVersion; + EXTENSIONAPI static const core::Property ConnectionTimeout; + EXTENSIONAPI static const core::Property KeepAliveInterval; + EXTENSIONAPI static const core::Property LastWillTopic; + EXTENSIONAPI static const core::Property LastWillMessage; + EXTENSIONAPI static const core::Property LastWillQoS; + EXTENSIONAPI static const core::Property LastWillRetain; + EXTENSIONAPI static const core::Property LastWillContentType; EXTENSIONAPI static const core::Property Username; EXTENSIONAPI static const core::Property Password; - EXTENSIONAPI static const core::Property KeepAliveInterval; - EXTENSIONAPI static const core::Property MaxFlowSegSize; - EXTENSIONAPI static const core::Property ConnectionTimeout; - EXTENSIONAPI static const core::Property Topic; - EXTENSIONAPI static const core::Property QoS; EXTENSIONAPI static const core::Property SecurityProtocol; EXTENSIONAPI static const core::Property SecurityCA; EXTENSIONAPI static const core::Property SecurityCert; EXTENSIONAPI static const core::Property SecurityPrivateKey; EXTENSIONAPI static const core::Property SecurityPrivateKeyPassword; - EXTENSIONAPI static const core::Property LastWillTopic; - EXTENSIONAPI static const core::Property LastWillMessage; - EXTENSIONAPI static const core::Property LastWillQoS; - EXTENSIONAPI static const core::Property LastWillRetain; - EXTENSIONAPI static auto properties() { + + static auto basicProperties() { +return std::array{ + BrokerURI, + ClientID, + MqttVersion +}; + } + + static auto advancedProperties() { return std::array{ -BrokerURI, -Topic, -ClientID, -QoS, -ConnectionTimeout, -KeepAliveInterval, -MaxFlowSegSize, -LastWillTopic, -LastWillMessage, -LastWillQoS, -LastWillRetain, -Username, -Password, -SecurityProtocol, -SecurityCA, -SecurityCert, -SecurityPrivateKey, -SecurityPrivateKeyPassword + QoS, + ConnectionTimeout, + KeepAliveInterval, + LastWillTopic, + LastWillMessage, + LastWillQoS, + LastWillRetain, + LastWillContentType, + Username, + Password, + SecurityProtocol, + SecurityCA, + SecurityCert, + SecurityPrivateKey, + SecurityPrivateKeyPassword }; } void onSchedule(const std::shared_ptr& context, const std::shared_ptr& factory) override; + void onTrigger(const std::shared_ptr& context, const std::shared_ptr& session) override; void notifyStop() override { freeResources(); } protected: + struct MQTTMessageDeleter { +void operator()(MQTTAsync_message* message) { + MQTTAsync_freeMessage(); +} + }; + + struct SmartMessage { +std::unique_ptr contents; +std::string topic; + }; + + // defined by Paho MQTT C library + static constexpr int PAHO_MQTT_C_FAILURE_CODE = -999; + static constexpr int MQTT_MAX_RECEIVE_MAXIMUM = 65535; + + /** + * Connect to MQTT broker. Synchronously waits until connection succeeds or fails. + */ void reconnect(); + /** + * Checks property consistency before connecting to broker + */ + virtual void checkProperties() { + } + + /** + * Checks broker limits and supported features vs our desired features after connecting to broker + */ + void checkBrokerLimits(); + virtual void checkBrokerLimitsImpl() = 0; + + // variables being used for a synchronous connection and disconnection + std::shared_mutex client_mutex_; + MQTTAsync client_ = nullptr; std::string uri_; - std::string topic_; std::chrono::seconds keep_alive_interval_{60}; - uint64_t max_seg_size_ = std::numeric_limits::max(); - std::chrono::seconds connection_timeout_{30}; - uint32_t qos_ = MQTT_QOS_1; + std::chrono::seconds connection_timeout_{10}; + MqttQoS qos_{MqttQoS::LEVEL_0}; std::string clientID_; std::string username_; std::string password_; + MqttVersions mqtt_version_{MqttVersions::V_3X_AUTO}; - private: - // MQTT async callback - static int msgReceived(void *context, char* topic_name, int topic_len,
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1042214797 ## extensions/mqtt/processors/AbstractMQTTProcessor.cpp: ## @@ -137,19 +148,45 @@ void AbstractMQTTProcessor::onSchedule(const std::shared_ptrlog_error("MQTT client is not existing while trying to reconnect"); -return; +throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT client is not existing while trying to reconnect"); } if (MQTTAsync_isConnected(client_)) { -logger_->log_info("Already connected to %s, no need to reconnect", uri_); +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); return; } - MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; + + MQTTAsync_connectOptions conn_opts; + MQTTProperties connect_props = MQTTProperties_initializer; + MQTTProperties will_props = MQTTProperties_initializer; + + if (mqtt_version_.value() == MqttVersions::V_5_0) { +conn_opts = MQTTAsync_connectOptions_initializer5; +conn_opts.onSuccess5 = connectionSuccess5; +conn_opts.onFailure5 = connectionFailure5; +conn_opts.connectProperties = _props; + } else { +conn_opts = MQTTAsync_connectOptions_initializer; +conn_opts.onSuccess = connectionSuccess; +conn_opts.onFailure = connectionFailure; + } + + if (mqtt_version_.value() == MqttVersions::V_3_1_0) { +conn_opts.MQTTVersion = MQTTVERSION_3_1; + } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) { +conn_opts.MQTTVersion = MQTTVERSION_3_1_1; + } + conn_opts.keepAliveInterval = gsl::narrow(keep_alive_interval_.count()); - conn_opts.cleansession = getCleanSession(); - conn_opts.context = this; - conn_opts.onSuccess = connectionSuccess; - conn_opts.onFailure = connectionFailure; + if (mqtt_version_.value() == MqttVersions::V_5_0) { +setMqtt5ConnectOptions(conn_opts, connect_props, will_props); Review Comment: Done. ## extensions/mqtt/processors/AbstractMQTTProcessor.cpp: ## @@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() { } logger_->log_info("Reconnecting to %s", uri_); - int ret = MQTTAsync_connect(client_, _opts); + if (MQTTAsync_isConnected(client_)) { +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); +return; + } + const int ret = MQTTAsync_connect(client_, _opts); + MQTTProperties_free(_props); if (ret != MQTTASYNC_SUCCESS) { -logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret); +logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret); +return; } + + // wait until connection succeeds or fails + connect_finished_task.get_future().get(); +} + +void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const { + conn_opts.cleanstart = getCleanStart(); + + { +MQTTProperty property; +property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL; +property.value.integer4 = gsl::narrow(getSessionExpiryInterval().count()); +MQTTProperties_add(_props, ); + } + + if (!last_will_content_type_.empty()) { +MQTTProperty property; +property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE; +property.value.data.len = last_will_content_type_.length(); +property.value.data.data = const_cast(last_will_content_type_.data()); +MQTTProperties_add(_props, ); + } + + conn_opts.willProperties = _props; + + setMqtt5ConnectOptionsImpl(connect_props); +} + +void AbstractMQTTProcessor::onTrigger(const std::shared_ptr& context, const std::shared_ptr& session) { + // read lock + std::shared_lock client_lock{client_mutex_}; + if (client_ == nullptr) { +// we are shutting down Review Comment: Done. ## extensions/mqtt/processors/AbstractMQTTProcessor.cpp: ## @@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() { } logger_->log_info("Reconnecting to %s", uri_); - int ret = MQTTAsync_connect(client_, _opts); + if (MQTTAsync_isConnected(client_)) { +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); +return; + } + const int ret = MQTTAsync_connect(client_, _opts); + MQTTProperties_free(_props); if (ret != MQTTASYNC_SUCCESS) { -logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret); +logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret); +return; } + + // wait until connection succeeds or fails + connect_finished_task.get_future().get(); +} + +void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const { + conn_opts.cleanstart = getCleanStart(); + + { +MQTTProperty property; +property.identifier =
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1042144555 ## PROCESSORS.md: ## @@ -337,28 +337,34 @@ This Processor gets the contents of a FlowFile from a MQTT broker for a specifie In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. -| Name | Default Value | Allowable Values | Description | -|---|---|--|-| -| **Broker URI**| | | The URI to use to connect to the MQTT broker | -| **Topic** | | | The topic to subscribe to | -| Client ID | | | MQTT client ID to use | -| Quality of Service| 0 | | The Quality of Service (QoS) to receive the message with. Accepts three values '0', '1' and '2' | -| Connection Timeout| 30 sec| | Maximum time interval the client will wait for the network connection to the MQTT broker | -| Keep Alive Interval | 60 sec| | Defines the maximum time interval between messages being sent to the broker | -| Max Flow Segment Size | | | Maximum flow content payload segment size for the MQTT record | -| Last Will Topic | | | The topic to send the client's Last Will to. If the Last Will topic is not set then a Last Will will not be sent| -| Last Will Message | | | The message to send as the client's Last Will. If the Last Will Message is empty, Last Will will be deleted from the broker | -| Last Will QoS | 0 | | The Quality of Service (QoS) to send the last will with. Accepts three values '0', '1' and '2' | -| Last Will Retain | false | | Whether to retain the client's Last Will | -| Security Protocol | | | Protocol used to communicate with brokers | -| Security CA | | | File or directory path to CA certificate(s) for verifying the broker's key | -| Security Cert | | | Path to client's public key (PEM) used for authentication | -| Security Private Key | | | Path to client's private key (PEM) used for authentication | -| Security Pass Phrase | | | Private key passphrase | -| Username | | | Username to use when connecting to the broker | -| Password | | | Password to use when connecting to the broker | -| Clean Session | true | | Whether to start afresh rather than remembering previous subscriptions | -| Queue Max Message | 1000 | | Maximum number of messages allowed on the received MQTT queue | +| Name| Default Value | Allowable Values| Description |
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1042119988 ## extensions/mqtt/processors/PublishMQTT.cpp: ## @@ -34,76 +34,229 @@ void PublishMQTT::initialize() { setSupportedRelationships(relationships()); } -void PublishMQTT::onSchedule(const std::shared_ptr , const std::shared_ptr ) { +void PublishMQTT::readProperties(const std::shared_ptr& context) { + if (!context->getProperty(Topic).has_value()) { +logger_->log_error("PublishMQTT: could not get Topic"); + } Review Comment: Update: if it's empty, even expression language cannot help. I am putting it back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041328112 ## extensions/mqtt/processors/PublishMQTT.cpp: ## @@ -34,76 +34,229 @@ void PublishMQTT::initialize() { setSupportedRelationships(relationships()); } -void PublishMQTT::onSchedule(const std::shared_ptr , const std::shared_ptr ) { +void PublishMQTT::readProperties(const std::shared_ptr& context) { + if (!context->getProperty(Topic).has_value()) { +logger_->log_error("PublishMQTT: could not get Topic"); + } + if (const auto retain_opt = context->getProperty(Retain)) { retain_ = *retain_opt; } logger_->log_debug("PublishMQTT: Retain [%d]", retain_); - AbstractMQTTProcessor::onSchedule(context, factory); + if (const auto message_expiry_interval = context->getProperty(MessageExpiryInterval)) { +message_expiry_interval_ = std::chrono::duration_cast(message_expiry_interval->getMilliseconds()); +logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", int64_t{message_expiry_interval_->count()}); + } + + in_flight_message_counter_.setMqttVersion(mqtt_version_); + in_flight_message_counter_.setQoS(qos_); } -void PublishMQTT::onTrigger(const std::shared_ptr& /*context*/, const std::shared_ptr ) { - // reconnect if needed - reconnect(); +void PublishMQTT::onTriggerImpl(const std::shared_ptr& context, const std::shared_ptr ) { + std::shared_ptr flow_file = session->get(); - if (!MQTTAsync_isConnected(client_)) { -logger_->log_error("Could not publish to MQTT broker because disconnected to %s", uri_); -yield(); + if (!flow_file) { return; } - std::shared_ptr flowFile = session->get(); + // broker's Receive Maximum can change after reconnect + in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM)); - if (!flowFile) { -return; + const auto topic = getTopic(context, flow_file); + try { +const auto result = session->readBuffer(flow_file); +if (result.status < 0 || !sendMessage(result.buffer, topic, getContentType(context, flow_file), flow_file)) { + logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), topic, uri_); + session->transfer(flow_file, Failure); + return; +} +logger_->log_debug("Sent flow file [%s] with length %" PRId64 " to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), result.status, topic, uri_); +session->transfer(flow_file, Success); + } catch (const Exception& ex) { +logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on broker %s, exception string: '%s'", flow_file->getUUIDStr(), topic, uri_, ex.what()); +session->transfer(flow_file, Failure); + } +} + +bool PublishMQTT::sendMessage(const std::vector& buffer, const std::string& topic, const std::string& content_type, const std::shared_ptr& flow_file) { + if (buffer.size() > 268'435'455) { +logger_->log_error("Sending message failed because MQTT limit maximum packet size [268'435'455] is exceeded by FlowFile of [%zu]", buffer.size()); } - PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_, topic_, client_, gsl::narrow(qos_), retain_); - session->read(flowFile, std::ref(callback)); - if (callback.status_ < 0) { -logger_->log_error("Failed to send flow to MQTT topic %s", topic_); -session->transfer(flowFile, Failure); + if (maximum_packet_size_.has_value() && buffer.size() > *(maximum_packet_size_)) { +logger_->log_error("Sending message failed because broker-requested maximum packet size [%" PRIu32 "] is exceeded by FlowFile of [%zu]", + *maximum_packet_size_, buffer.size()); + } + + MQTTAsync_message message_to_publish = MQTTAsync_message_initializer; + message_to_publish.payload = const_cast(buffer.data()); + message_to_publish.payloadlen = buffer.size(); + message_to_publish.qos = qos_.value(); + message_to_publish.retained = retain_; + + setMqtt5Properties(message_to_publish, content_type, flow_file); + + MQTTAsync_responseOptions response_options = MQTTAsync_responseOptions_initializer; + if (mqtt_version_ == MqttVersions::V_5_0) { +response_options.onSuccess5 = sendSuccess5; +response_options.onFailure5 = sendFailure5; } else { -logger_->log_debug("Sent flow with length %d to MQTT topic %s", callback.read_size_, topic_); -session->transfer(flowFile, Success); - } -} - -int64_t PublishMQTT::ReadCallback::operator()(const std::shared_ptr& stream) { - if (flow_size_ < max_seg_size_) -max_seg_size_ = flow_size_; - gsl_Expects(max_seg_size_ < gsl::narrow(std::numeric_limits::max())); - std::vector buffer(max_seg_size_); - read_size_ = 0; - status_ = 0; - while (read_size_ < flow_size_) { -// MQTTClient_message::payloadlen is int, so we can't handle 2GB+ -const auto readRet = stream->read(buffer); -
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041315835 ## extensions/mqtt/processors/PublishMQTT.cpp: ## @@ -34,76 +34,229 @@ void PublishMQTT::initialize() { setSupportedRelationships(relationships()); } -void PublishMQTT::onSchedule(const std::shared_ptr , const std::shared_ptr ) { +void PublishMQTT::readProperties(const std::shared_ptr& context) { + if (!context->getProperty(Topic).has_value()) { +logger_->log_error("PublishMQTT: could not get Topic"); + } + if (const auto retain_opt = context->getProperty(Retain)) { retain_ = *retain_opt; } logger_->log_debug("PublishMQTT: Retain [%d]", retain_); - AbstractMQTTProcessor::onSchedule(context, factory); + if (const auto message_expiry_interval = context->getProperty(MessageExpiryInterval)) { +message_expiry_interval_ = std::chrono::duration_cast(message_expiry_interval->getMilliseconds()); +logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", int64_t{message_expiry_interval_->count()}); + } + + in_flight_message_counter_.setMqttVersion(mqtt_version_); + in_flight_message_counter_.setQoS(qos_); } -void PublishMQTT::onTrigger(const std::shared_ptr& /*context*/, const std::shared_ptr ) { - // reconnect if needed - reconnect(); +void PublishMQTT::onTriggerImpl(const std::shared_ptr& context, const std::shared_ptr ) { + std::shared_ptr flow_file = session->get(); - if (!MQTTAsync_isConnected(client_)) { -logger_->log_error("Could not publish to MQTT broker because disconnected to %s", uri_); -yield(); + if (!flow_file) { return; } - std::shared_ptr flowFile = session->get(); + // broker's Receive Maximum can change after reconnect + in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM)); - if (!flowFile) { -return; + const auto topic = getTopic(context, flow_file); + try { +const auto result = session->readBuffer(flow_file); +if (result.status < 0 || !sendMessage(result.buffer, topic, getContentType(context, flow_file), flow_file)) { + logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), topic, uri_); + session->transfer(flow_file, Failure); + return; +} +logger_->log_debug("Sent flow file [%s] with length %" PRId64 " to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), result.status, topic, uri_); +session->transfer(flow_file, Success); + } catch (const Exception& ex) { +logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on broker %s, exception string: '%s'", flow_file->getUUIDStr(), topic, uri_, ex.what()); +session->transfer(flow_file, Failure); + } +} + +bool PublishMQTT::sendMessage(const std::vector& buffer, const std::string& topic, const std::string& content_type, const std::shared_ptr& flow_file) { + if (buffer.size() > 268'435'455) { +logger_->log_error("Sending message failed because MQTT limit maximum packet size [268'435'455] is exceeded by FlowFile of [%zu]", buffer.size()); } - PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_, topic_, client_, gsl::narrow(qos_), retain_); - session->read(flowFile, std::ref(callback)); - if (callback.status_ < 0) { -logger_->log_error("Failed to send flow to MQTT topic %s", topic_); -session->transfer(flowFile, Failure); + if (maximum_packet_size_.has_value() && buffer.size() > *(maximum_packet_size_)) { +logger_->log_error("Sending message failed because broker-requested maximum packet size [%" PRIu32 "] is exceeded by FlowFile of [%zu]", + *maximum_packet_size_, buffer.size()); + } + + MQTTAsync_message message_to_publish = MQTTAsync_message_initializer; + message_to_publish.payload = const_cast(buffer.data()); + message_to_publish.payloadlen = buffer.size(); + message_to_publish.qos = qos_.value(); + message_to_publish.retained = retain_; + + setMqtt5Properties(message_to_publish, content_type, flow_file); + + MQTTAsync_responseOptions response_options = MQTTAsync_responseOptions_initializer; + if (mqtt_version_ == MqttVersions::V_5_0) { +response_options.onSuccess5 = sendSuccess5; +response_options.onFailure5 = sendFailure5; } else { -logger_->log_debug("Sent flow with length %d to MQTT topic %s", callback.read_size_, topic_); -session->transfer(flowFile, Success); - } -} - -int64_t PublishMQTT::ReadCallback::operator()(const std::shared_ptr& stream) { - if (flow_size_ < max_seg_size_) -max_seg_size_ = flow_size_; - gsl_Expects(max_seg_size_ < gsl::narrow(std::numeric_limits::max())); - std::vector buffer(max_seg_size_); - read_size_ = 0; - status_ = 0; - while (read_size_ < flow_size_) { -// MQTTClient_message::payloadlen is int, so we can't handle 2GB+ -const auto readRet = stream->read(buffer); -
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041313305 ## extensions/mqtt/processors/PublishMQTT.cpp: ## @@ -34,76 +34,229 @@ void PublishMQTT::initialize() { setSupportedRelationships(relationships()); } -void PublishMQTT::onSchedule(const std::shared_ptr , const std::shared_ptr ) { +void PublishMQTT::readProperties(const std::shared_ptr& context) { + if (!context->getProperty(Topic).has_value()) { +logger_->log_error("PublishMQTT: could not get Topic"); + } + if (const auto retain_opt = context->getProperty(Retain)) { retain_ = *retain_opt; } logger_->log_debug("PublishMQTT: Retain [%d]", retain_); - AbstractMQTTProcessor::onSchedule(context, factory); + if (const auto message_expiry_interval = context->getProperty(MessageExpiryInterval)) { +message_expiry_interval_ = std::chrono::duration_cast(message_expiry_interval->getMilliseconds()); +logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", int64_t{message_expiry_interval_->count()}); + } + + in_flight_message_counter_.setMqttVersion(mqtt_version_); + in_flight_message_counter_.setQoS(qos_); } -void PublishMQTT::onTrigger(const std::shared_ptr& /*context*/, const std::shared_ptr ) { - // reconnect if needed - reconnect(); +void PublishMQTT::onTriggerImpl(const std::shared_ptr& context, const std::shared_ptr ) { + std::shared_ptr flow_file = session->get(); - if (!MQTTAsync_isConnected(client_)) { -logger_->log_error("Could not publish to MQTT broker because disconnected to %s", uri_); -yield(); + if (!flow_file) { return; } - std::shared_ptr flowFile = session->get(); + // broker's Receive Maximum can change after reconnect + in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM)); - if (!flowFile) { -return; + const auto topic = getTopic(context, flow_file); + try { +const auto result = session->readBuffer(flow_file); +if (result.status < 0 || !sendMessage(result.buffer, topic, getContentType(context, flow_file), flow_file)) { + logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), topic, uri_); + session->transfer(flow_file, Failure); + return; +} +logger_->log_debug("Sent flow file [%s] with length %" PRId64 " to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), result.status, topic, uri_); +session->transfer(flow_file, Success); + } catch (const Exception& ex) { +logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on broker %s, exception string: '%s'", flow_file->getUUIDStr(), topic, uri_, ex.what()); +session->transfer(flow_file, Failure); + } +} + +bool PublishMQTT::sendMessage(const std::vector& buffer, const std::string& topic, const std::string& content_type, const std::shared_ptr& flow_file) { + if (buffer.size() > 268'435'455) { Review Comment: It is coming from the MQTT specification. Remaining Length field is a variable byte integer: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901024 The maximum value of a Variable Byte Integer is 268,435,455. I found the decimal representation all over the internet, so I think we should stay with the one more commonly used. https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901011 But I'm extracting it to a constant as requested, I don't like magic numbers either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041290532 ## extensions/mqtt/processors/PublishMQTT.cpp: ## @@ -34,76 +34,229 @@ void PublishMQTT::initialize() { setSupportedRelationships(relationships()); } -void PublishMQTT::onSchedule(const std::shared_ptr , const std::shared_ptr ) { +void PublishMQTT::readProperties(const std::shared_ptr& context) { + if (!context->getProperty(Topic).has_value()) { +logger_->log_error("PublishMQTT: could not get Topic"); + } + if (const auto retain_opt = context->getProperty(Retain)) { retain_ = *retain_opt; } logger_->log_debug("PublishMQTT: Retain [%d]", retain_); - AbstractMQTTProcessor::onSchedule(context, factory); + if (const auto message_expiry_interval = context->getProperty(MessageExpiryInterval)) { +message_expiry_interval_ = std::chrono::duration_cast(message_expiry_interval->getMilliseconds()); +logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", int64_t{message_expiry_interval_->count()}); + } + + in_flight_message_counter_.setMqttVersion(mqtt_version_); + in_flight_message_counter_.setQoS(qos_); } -void PublishMQTT::onTrigger(const std::shared_ptr& /*context*/, const std::shared_ptr ) { - // reconnect if needed - reconnect(); +void PublishMQTT::onTriggerImpl(const std::shared_ptr& context, const std::shared_ptr ) { + std::shared_ptr flow_file = session->get(); - if (!MQTTAsync_isConnected(client_)) { -logger_->log_error("Could not publish to MQTT broker because disconnected to %s", uri_); -yield(); + if (!flow_file) { return; Review Comment: Good point! Added. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041288113 ## extensions/mqtt/processors/PublishMQTT.cpp: ## @@ -34,76 +34,229 @@ void PublishMQTT::initialize() { setSupportedRelationships(relationships()); } -void PublishMQTT::onSchedule(const std::shared_ptr , const std::shared_ptr ) { +void PublishMQTT::readProperties(const std::shared_ptr& context) { + if (!context->getProperty(Topic).has_value()) { +logger_->log_error("PublishMQTT: could not get Topic"); + } Review Comment: `Topic` is an expression language property. So you are right, it can become valid later. I am removing this error message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041281537 ## extensions/mqtt/processors/ConsumeMQTT.cpp: ## @@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() { setSupportedRelationships(relationships()); } -void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) { - if (queue_.size_approx() >= maxQueueSize_) { -logger_->log_warn("MQTT queue full"); +void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) { + if (queue_.size_approx() >= max_queue_size_) { +logger_->log_error("MQTT queue full"); return; } - if (gsl::narrow(message->payloadlen) > max_seg_size_) { -logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen); -message->payloadlen = gsl::narrow(max_seg_size_); - } - - logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen); + logger_->log_debug("enqueuing MQTT message with length %d", message.contents->payloadlen); queue_.enqueue(std::move(message)); } -void ConsumeMQTT::onSchedule(const std::shared_ptr , const std::shared_ptr ) { +void ConsumeMQTT::readProperties(const std::shared_ptr& context) { + if (auto value = context->getProperty(Topic)) { +topic_ = std::move(*value); + } + logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_); + if (const auto value = context->getProperty(CleanSession)) { -cleanSession_ = *value; -logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_); +clean_session_ = *value; } + logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_); + + if (const auto value = context->getProperty(CleanStart)) { +clean_start_ = *value; + } + logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_); + + if (const auto session_expiry_interval = context->getProperty(SessionExpiryInterval)) { +session_expiry_interval_ = std::chrono::duration_cast(session_expiry_interval->getMilliseconds()); + } + logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", int64_t{session_expiry_interval_.count()}); if (const auto value = context->getProperty(QueueBufferMaxMessage)) { -maxQueueSize_ = *value; -logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_); +max_queue_size_ = *value; } + logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", max_queue_size_); - // this connects to broker, so properties of this processor must be read before - AbstractMQTTProcessor::onSchedule(context, factory); -} + if (auto value = context->getProperty(AttributeFromContentType)) { +attribute_from_content_type_ = std::move(*value); + } + logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", attribute_from_content_type_); -void ConsumeMQTT::onTrigger(const std::shared_ptr& /*context*/, const std::shared_ptr ) { - // reconnect if needed - reconnect(); + if (const auto topic_alias_maximum = context->getProperty(TopicAliasMaximum)) { +topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum); + } + logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", topic_alias_maximum_); - if (!MQTTAsync_isConnected(client_)) { -logger_->log_error("Could not consume from MQTT broker because disconnected to %s", uri_); -yield(); -return; + if (const auto receive_maximum = context->getProperty(ReceiveMaximum)) { +receive_maximum_ = gsl::narrow(*receive_maximum); } + logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", receive_maximum_); +} - std::deque> msg_queue; - getReceivedMQTTMsg(msg_queue); +void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& /*context*/, const std::shared_ptr& session) { + std::queue msg_queue = getReceivedMqttMessages(); while (!msg_queue.empty()) { const auto& message = msg_queue.front(); -std::shared_ptr processFlowFile = session->create(); -int write_status{}; -session->write(processFlowFile, [, _status](const std::shared_ptr& stream) -> int64_t { - if (message->payloadlen < 0) { -write_status = -1; -return -1; - } - const auto len = stream->write(reinterpret_cast(message->payload), gsl::narrow(message->payloadlen)); - if (io::isError(len)) { -write_status = -1; -return -1; - } - return gsl::narrow(len); -}); -if (write_status < 0) { - logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr()); - session->remove(processFlowFile); +std::shared_ptr flow_file = session->create(); +WriteCallback write_callback(message, logger_); +try { + session->write(flow_file, write_callback); +} catch (const Exception& ex) { + logger_->log_error("Error when processing message queue: %s", ex.what()); +} +if (!write_callback.getSuccessStatus()) { +
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041281303 ## extensions/mqtt/processors/ConsumeMQTT.cpp: ## @@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() { setSupportedRelationships(relationships()); } -void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) { - if (queue_.size_approx() >= maxQueueSize_) { -logger_->log_warn("MQTT queue full"); +void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) { + if (queue_.size_approx() >= max_queue_size_) { +logger_->log_error("MQTT queue full"); return; } - if (gsl::narrow(message->payloadlen) > max_seg_size_) { -logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen); -message->payloadlen = gsl::narrow(max_seg_size_); - } - - logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen); + logger_->log_debug("enqueuing MQTT message with length %d", message.contents->payloadlen); queue_.enqueue(std::move(message)); } -void ConsumeMQTT::onSchedule(const std::shared_ptr , const std::shared_ptr ) { +void ConsumeMQTT::readProperties(const std::shared_ptr& context) { + if (auto value = context->getProperty(Topic)) { +topic_ = std::move(*value); + } + logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_); + if (const auto value = context->getProperty(CleanSession)) { -cleanSession_ = *value; -logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_); +clean_session_ = *value; } + logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_); + + if (const auto value = context->getProperty(CleanStart)) { +clean_start_ = *value; + } + logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_); + + if (const auto session_expiry_interval = context->getProperty(SessionExpiryInterval)) { +session_expiry_interval_ = std::chrono::duration_cast(session_expiry_interval->getMilliseconds()); + } + logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", int64_t{session_expiry_interval_.count()}); if (const auto value = context->getProperty(QueueBufferMaxMessage)) { -maxQueueSize_ = *value; -logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_); +max_queue_size_ = *value; } + logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", max_queue_size_); - // this connects to broker, so properties of this processor must be read before - AbstractMQTTProcessor::onSchedule(context, factory); -} + if (auto value = context->getProperty(AttributeFromContentType)) { +attribute_from_content_type_ = std::move(*value); + } + logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", attribute_from_content_type_); -void ConsumeMQTT::onTrigger(const std::shared_ptr& /*context*/, const std::shared_ptr ) { - // reconnect if needed - reconnect(); + if (const auto topic_alias_maximum = context->getProperty(TopicAliasMaximum)) { +topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum); + } + logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", topic_alias_maximum_); - if (!MQTTAsync_isConnected(client_)) { -logger_->log_error("Could not consume from MQTT broker because disconnected to %s", uri_); -yield(); -return; + if (const auto receive_maximum = context->getProperty(ReceiveMaximum)) { +receive_maximum_ = gsl::narrow(*receive_maximum); } + logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", receive_maximum_); +} - std::deque> msg_queue; - getReceivedMQTTMsg(msg_queue); +void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& /*context*/, const std::shared_ptr& session) { + std::queue msg_queue = getReceivedMqttMessages(); while (!msg_queue.empty()) { const auto& message = msg_queue.front(); -std::shared_ptr processFlowFile = session->create(); -int write_status{}; -session->write(processFlowFile, [, _status](const std::shared_ptr& stream) -> int64_t { - if (message->payloadlen < 0) { -write_status = -1; -return -1; - } - const auto len = stream->write(reinterpret_cast(message->payload), gsl::narrow(message->payloadlen)); - if (io::isError(len)) { -write_status = -1; -return -1; - } - return gsl::narrow(len); -}); -if (write_status < 0) { - logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr()); - session->remove(processFlowFile); +std::shared_ptr flow_file = session->create(); +WriteCallback write_callback(message, logger_); +try { + session->write(flow_file, write_callback); +} catch (const Exception& ex) { + logger_->log_error("Error when processing message queue: %s", ex.what()); +} +if (!write_callback.getSuccessStatus()) { +
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041280626 ## extensions/mqtt/processors/ConsumeMQTT.cpp: ## @@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() { setSupportedRelationships(relationships()); } -void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) { - if (queue_.size_approx() >= maxQueueSize_) { -logger_->log_warn("MQTT queue full"); +void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) { + if (queue_.size_approx() >= max_queue_size_) { +logger_->log_error("MQTT queue full"); return; } - if (gsl::narrow(message->payloadlen) > max_seg_size_) { -logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen); -message->payloadlen = gsl::narrow(max_seg_size_); - } - - logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen); + logger_->log_debug("enqueuing MQTT message with length %d", message.contents->payloadlen); queue_.enqueue(std::move(message)); } -void ConsumeMQTT::onSchedule(const std::shared_ptr , const std::shared_ptr ) { +void ConsumeMQTT::readProperties(const std::shared_ptr& context) { + if (auto value = context->getProperty(Topic)) { +topic_ = std::move(*value); + } + logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_); + if (const auto value = context->getProperty(CleanSession)) { -cleanSession_ = *value; -logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_); +clean_session_ = *value; } + logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_); + + if (const auto value = context->getProperty(CleanStart)) { +clean_start_ = *value; + } + logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_); + + if (const auto session_expiry_interval = context->getProperty(SessionExpiryInterval)) { +session_expiry_interval_ = std::chrono::duration_cast(session_expiry_interval->getMilliseconds()); + } + logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", int64_t{session_expiry_interval_.count()}); if (const auto value = context->getProperty(QueueBufferMaxMessage)) { -maxQueueSize_ = *value; -logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_); +max_queue_size_ = *value; } + logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", max_queue_size_); - // this connects to broker, so properties of this processor must be read before - AbstractMQTTProcessor::onSchedule(context, factory); -} + if (auto value = context->getProperty(AttributeFromContentType)) { +attribute_from_content_type_ = std::move(*value); + } + logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", attribute_from_content_type_); -void ConsumeMQTT::onTrigger(const std::shared_ptr& /*context*/, const std::shared_ptr ) { - // reconnect if needed - reconnect(); + if (const auto topic_alias_maximum = context->getProperty(TopicAliasMaximum)) { +topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum); + } + logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", topic_alias_maximum_); - if (!MQTTAsync_isConnected(client_)) { -logger_->log_error("Could not consume from MQTT broker because disconnected to %s", uri_); -yield(); -return; + if (const auto receive_maximum = context->getProperty(ReceiveMaximum)) { +receive_maximum_ = gsl::narrow(*receive_maximum); } + logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", receive_maximum_); +} - std::deque> msg_queue; - getReceivedMQTTMsg(msg_queue); +void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& /*context*/, const std::shared_ptr& session) { + std::queue msg_queue = getReceivedMqttMessages(); while (!msg_queue.empty()) { const auto& message = msg_queue.front(); -std::shared_ptr processFlowFile = session->create(); -int write_status{}; -session->write(processFlowFile, [, _status](const std::shared_ptr& stream) -> int64_t { - if (message->payloadlen < 0) { -write_status = -1; -return -1; - } - const auto len = stream->write(reinterpret_cast(message->payload), gsl::narrow(message->payloadlen)); - if (io::isError(len)) { -write_status = -1; -return -1; - } - return gsl::narrow(len); -}); -if (write_status < 0) { - logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr()); - session->remove(processFlowFile); +std::shared_ptr flow_file = session->create(); +WriteCallback write_callback(message, logger_); +try { + session->write(flow_file, write_callback); +} catch (const Exception& ex) { + logger_->log_error("Error when processing message queue: %s", ex.what()); +} +if (!write_callback.getSuccessStatus()) { +
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041268666 ## extensions/mqtt/processors/ConsumeMQTT.cpp: ## @@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() { setSupportedRelationships(relationships()); } -void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) { - if (queue_.size_approx() >= maxQueueSize_) { -logger_->log_warn("MQTT queue full"); +void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) { + if (queue_.size_approx() >= max_queue_size_) { +logger_->log_error("MQTT queue full"); return; } - if (gsl::narrow(message->payloadlen) > max_seg_size_) { -logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen); -message->payloadlen = gsl::narrow(max_seg_size_); - } - - logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen); + logger_->log_debug("enqueuing MQTT message with length %d", message.contents->payloadlen); queue_.enqueue(std::move(message)); } -void ConsumeMQTT::onSchedule(const std::shared_ptr , const std::shared_ptr ) { +void ConsumeMQTT::readProperties(const std::shared_ptr& context) { + if (auto value = context->getProperty(Topic)) { +topic_ = std::move(*value); + } + logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_); + if (const auto value = context->getProperty(CleanSession)) { -cleanSession_ = *value; -logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_); +clean_session_ = *value; } + logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_); + + if (const auto value = context->getProperty(CleanStart)) { +clean_start_ = *value; + } + logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_); + + if (const auto session_expiry_interval = context->getProperty(SessionExpiryInterval)) { +session_expiry_interval_ = std::chrono::duration_cast(session_expiry_interval->getMilliseconds()); + } + logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", int64_t{session_expiry_interval_.count()}); if (const auto value = context->getProperty(QueueBufferMaxMessage)) { -maxQueueSize_ = *value; -logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_); +max_queue_size_ = *value; } + logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", max_queue_size_); - // this connects to broker, so properties of this processor must be read before - AbstractMQTTProcessor::onSchedule(context, factory); -} + if (auto value = context->getProperty(AttributeFromContentType)) { +attribute_from_content_type_ = std::move(*value); + } + logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", attribute_from_content_type_); -void ConsumeMQTT::onTrigger(const std::shared_ptr& /*context*/, const std::shared_ptr ) { - // reconnect if needed - reconnect(); + if (const auto topic_alias_maximum = context->getProperty(TopicAliasMaximum)) { +topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum); + } + logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", topic_alias_maximum_); - if (!MQTTAsync_isConnected(client_)) { -logger_->log_error("Could not consume from MQTT broker because disconnected to %s", uri_); -yield(); -return; + if (const auto receive_maximum = context->getProperty(ReceiveMaximum)) { +receive_maximum_ = gsl::narrow(*receive_maximum); } + logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", receive_maximum_); +} - std::deque> msg_queue; - getReceivedMQTTMsg(msg_queue); +void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& /*context*/, const std::shared_ptr& session) { + std::queue msg_queue = getReceivedMqttMessages(); while (!msg_queue.empty()) { const auto& message = msg_queue.front(); -std::shared_ptr processFlowFile = session->create(); -int write_status{}; -session->write(processFlowFile, [, _status](const std::shared_ptr& stream) -> int64_t { - if (message->payloadlen < 0) { -write_status = -1; -return -1; - } - const auto len = stream->write(reinterpret_cast(message->payload), gsl::narrow(message->payloadlen)); - if (io::isError(len)) { -write_status = -1; -return -1; - } - return gsl::narrow(len); -}); -if (write_status < 0) { - logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr()); - session->remove(processFlowFile); +std::shared_ptr flow_file = session->create(); +WriteCallback write_callback(message, logger_); +try { + session->write(flow_file, write_callback); +} catch (const Exception& ex) { + logger_->log_error("Error when processing message queue: %s", ex.what()); +} +if (!write_callback.getSuccessStatus()) { +
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041258769 ## extensions/mqtt/processors/ConsumeMQTT.cpp: ## @@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() { setSupportedRelationships(relationships()); } -void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) { - if (queue_.size_approx() >= maxQueueSize_) { -logger_->log_warn("MQTT queue full"); +void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) { + if (queue_.size_approx() >= max_queue_size_) { +logger_->log_error("MQTT queue full"); return; } - if (gsl::narrow(message->payloadlen) > max_seg_size_) { -logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen); -message->payloadlen = gsl::narrow(max_seg_size_); - } - - logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen); + logger_->log_debug("enqueuing MQTT message with length %d", message.contents->payloadlen); queue_.enqueue(std::move(message)); } -void ConsumeMQTT::onSchedule(const std::shared_ptr , const std::shared_ptr ) { +void ConsumeMQTT::readProperties(const std::shared_ptr& context) { + if (auto value = context->getProperty(Topic)) { +topic_ = std::move(*value); + } + logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_); + if (const auto value = context->getProperty(CleanSession)) { -cleanSession_ = *value; -logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_); +clean_session_ = *value; } + logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_); + + if (const auto value = context->getProperty(CleanStart)) { +clean_start_ = *value; + } + logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_); + + if (const auto session_expiry_interval = context->getProperty(SessionExpiryInterval)) { +session_expiry_interval_ = std::chrono::duration_cast(session_expiry_interval->getMilliseconds()); + } + logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", int64_t{session_expiry_interval_.count()}); if (const auto value = context->getProperty(QueueBufferMaxMessage)) { -maxQueueSize_ = *value; -logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_); +max_queue_size_ = *value; } + logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", max_queue_size_); - // this connects to broker, so properties of this processor must be read before - AbstractMQTTProcessor::onSchedule(context, factory); -} + if (auto value = context->getProperty(AttributeFromContentType)) { +attribute_from_content_type_ = std::move(*value); + } + logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", attribute_from_content_type_); -void ConsumeMQTT::onTrigger(const std::shared_ptr& /*context*/, const std::shared_ptr ) { - // reconnect if needed - reconnect(); + if (const auto topic_alias_maximum = context->getProperty(TopicAliasMaximum)) { +topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum); + } + logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", topic_alias_maximum_); - if (!MQTTAsync_isConnected(client_)) { -logger_->log_error("Could not consume from MQTT broker because disconnected to %s", uri_); -yield(); -return; + if (const auto receive_maximum = context->getProperty(ReceiveMaximum)) { +receive_maximum_ = gsl::narrow(*receive_maximum); } + logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", receive_maximum_); +} - std::deque> msg_queue; - getReceivedMQTTMsg(msg_queue); +void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& /*context*/, const std::shared_ptr& session) { + std::queue msg_queue = getReceivedMqttMessages(); while (!msg_queue.empty()) { const auto& message = msg_queue.front(); -std::shared_ptr processFlowFile = session->create(); -int write_status{}; -session->write(processFlowFile, [, _status](const std::shared_ptr& stream) -> int64_t { - if (message->payloadlen < 0) { -write_status = -1; -return -1; - } - const auto len = stream->write(reinterpret_cast(message->payload), gsl::narrow(message->payloadlen)); - if (io::isError(len)) { -write_status = -1; -return -1; - } - return gsl::narrow(len); -}); -if (write_status < 0) { - logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr()); - session->remove(processFlowFile); +std::shared_ptr flow_file = session->create(); +WriteCallback write_callback(message, logger_); +try { + session->write(flow_file, write_callback); Review Comment: Good catch! I tested it on dummy code, `std::ref` will really be necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041159887 ## extensions/mqtt/processors/AbstractMQTTProcessor.cpp: ## @@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() { } logger_->log_info("Reconnecting to %s", uri_); - int ret = MQTTAsync_connect(client_, _opts); + if (MQTTAsync_isConnected(client_)) { +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); +return; + } + const int ret = MQTTAsync_connect(client_, _opts); + MQTTProperties_free(_props); if (ret != MQTTASYNC_SUCCESS) { -logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret); +logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret); +return; } + + // wait until connection succeeds or fails + connect_finished_task.get_future().get(); +} + +void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const { + conn_opts.cleanstart = getCleanStart(); + + { +MQTTProperty property; +property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL; +property.value.integer4 = gsl::narrow(getSessionExpiryInterval().count()); +MQTTProperties_add(_props, ); + } + + if (!last_will_content_type_.empty()) { +MQTTProperty property; +property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE; +property.value.data.len = last_will_content_type_.length(); +property.value.data.data = const_cast(last_will_content_type_.data()); +MQTTProperties_add(_props, ); + } + + conn_opts.willProperties = _props; + + setMqtt5ConnectOptionsImpl(connect_props); +} + +void AbstractMQTTProcessor::onTrigger(const std::shared_ptr& context, const std::shared_ptr& session) { + // read lock + std::shared_lock client_lock{client_mutex_}; + if (client_ == nullptr) { +// we are shutting down +return; + } + + // reconnect if needed + reconnect(); + + if (!MQTTAsync_isConnected(client_)) { +logger_->log_error("Could not work with MQTT broker because disconnected to %s", uri_); +yield(); +return; + } + + onTriggerImpl(context, session); } void AbstractMQTTProcessor::freeResources() { - if (client_ && MQTTAsync_isConnected(client_)) { -MQTTAsync_disconnectOptions disconnect_options = MQTTAsync_disconnectOptions_initializer; -disconnect_options.context = this; -disconnect_options.onSuccess = disconnectionSuccess; -disconnect_options.onFailure = disconnectionFailure; -disconnect_options.timeout = gsl::narrow(std::chrono::milliseconds{connection_timeout_}.count()); -MQTTAsync_disconnect(client_, _options); + // write lock + std::lock_guard client_lock{client_mutex_}; + + if (!client_) { +return; } - if (client_) { -MQTTAsync_destroy(_); + + disconnect(); + + MQTTAsync_destroy(_); +} + +void AbstractMQTTProcessor::disconnect() { + if (!MQTTAsync_isConnected(client_)) { +return; + } + + MQTTAsync_disconnectOptions disconnect_options = MQTTAsync_disconnectOptions_initializer; + std::packaged_task disconnect_finished_task( + [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) { +onDisconnectFinished(success_data, success_data_5, failure_data, failure_data_5); + }); + disconnect_options.context = _finished_task; + + if (mqtt_version_.value() == MqttVersions::V_5_0) { +disconnect_options.onSuccess5 = connectionSuccess5; +disconnect_options.onFailure5 = connectionFailure5; + } else { +disconnect_options.onSuccess = connectionSuccess; +disconnect_options.onFailure = connectionFailure; + } + + disconnect_options.timeout = gsl::narrow(std::chrono::milliseconds{connection_timeout_}.count()); + + const int ret = MQTTAsync_disconnect(client_, _options); + if (ret != MQTTASYNC_SUCCESS) { +logger_->log_error("MQTTAsync_disconnect failed to MQTT broker %s with error code [%d]", uri_, ret); +return; + } + + // wait until connection succeeds or fails + disconnect_finished_task.get_future().get(); +} + +void AbstractMQTTProcessor::setBrokerLimits(MQTTAsync_successData5* response) { + auto readProperty = [response] (MQTTPropertyCodes property_code, auto& out_var) { +const int value = MQTTProperties_getNumericValue(>properties, property_code); +if (value != PAHO_MQTT_C_FAILURE_CODE) { + if constexpr (std::is_same_v&>) { +out_var = std::chrono::seconds(value); + } else { +out_var = gsl::narrow::value_type>(value); + } +} else { + out_var.reset(); +} + }; + + readProperty(MQTTPROPERTY_CODE_RETAIN_AVAILABLE, retain_available_); + readProperty(MQTTPROPERTY_CODE_WILDCARD_SUBSCRIPTION_AVAILABLE, wildcard_subscription_available_); +
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041148586 ## libminifi/include/utils/Enum.h: ## @@ -127,7 +128,7 @@ namespace utils { #define SMART_ENUM(Clazz, ...) \ struct Clazz { \ using Base = ::org::apache::nifi::minifi::utils::EnumBase; \ -enum Type { \ +enum Type : int { \ Review Comment: It would still not enable `SMART_ENUM` initialization from integer without a cast. Only it's `Type` structure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1040850939 ## extensions/mqtt/processors/AbstractMQTTProcessor.h: ## @@ -46,150 +44,178 @@ class AbstractMQTTProcessor : public core::Processor { freeResources(); } + SMART_ENUM(MqttVersions, +(V_3X_AUTO, "3.x AUTO"), +(V_3_1_0, "3.1.0"), +(V_3_1_1, "3.1.1"), +(V_5_0, "5.0")); + + SMART_ENUM(MqttQoS, +(LEVEL_0, "0"), +(LEVEL_1, "1"), +(LEVEL_2, "2")); + EXTENSIONAPI static const core::Property BrokerURI; EXTENSIONAPI static const core::Property ClientID; + EXTENSIONAPI static const core::Property QoS; + EXTENSIONAPI static const core::Property MqttVersion; + EXTENSIONAPI static const core::Property ConnectionTimeout; + EXTENSIONAPI static const core::Property KeepAliveInterval; + EXTENSIONAPI static const core::Property LastWillTopic; + EXTENSIONAPI static const core::Property LastWillMessage; + EXTENSIONAPI static const core::Property LastWillQoS; + EXTENSIONAPI static const core::Property LastWillRetain; + EXTENSIONAPI static const core::Property LastWillContentType; EXTENSIONAPI static const core::Property Username; EXTENSIONAPI static const core::Property Password; - EXTENSIONAPI static const core::Property KeepAliveInterval; - EXTENSIONAPI static const core::Property MaxFlowSegSize; - EXTENSIONAPI static const core::Property ConnectionTimeout; - EXTENSIONAPI static const core::Property Topic; - EXTENSIONAPI static const core::Property QoS; EXTENSIONAPI static const core::Property SecurityProtocol; EXTENSIONAPI static const core::Property SecurityCA; EXTENSIONAPI static const core::Property SecurityCert; EXTENSIONAPI static const core::Property SecurityPrivateKey; EXTENSIONAPI static const core::Property SecurityPrivateKeyPassword; - EXTENSIONAPI static const core::Property LastWillTopic; - EXTENSIONAPI static const core::Property LastWillMessage; - EXTENSIONAPI static const core::Property LastWillQoS; - EXTENSIONAPI static const core::Property LastWillRetain; - EXTENSIONAPI static auto properties() { + + static auto basicProperties() { +return std::array{ + BrokerURI, + ClientID, + MqttVersion +}; + } + + static auto advancedProperties() { return std::array{ -BrokerURI, -Topic, -ClientID, -QoS, -ConnectionTimeout, -KeepAliveInterval, -MaxFlowSegSize, -LastWillTopic, -LastWillMessage, -LastWillQoS, -LastWillRetain, -Username, -Password, -SecurityProtocol, -SecurityCA, -SecurityCert, -SecurityPrivateKey, -SecurityPrivateKeyPassword + QoS, + ConnectionTimeout, + KeepAliveInterval, + LastWillTopic, + LastWillMessage, + LastWillQoS, + LastWillRetain, + LastWillContentType, + Username, + Password, + SecurityProtocol, + SecurityCA, + SecurityCert, + SecurityPrivateKey, + SecurityPrivateKeyPassword }; } void onSchedule(const std::shared_ptr& context, const std::shared_ptr& factory) override; + void onTrigger(const std::shared_ptr& context, const std::shared_ptr& session) override; void notifyStop() override { freeResources(); } protected: + struct MQTTMessageDeleter { +void operator()(MQTTAsync_message* message) { + MQTTAsync_freeMessage(); Review Comment: It is required, the signature is `void MQTTAsync_freeMessage(MQTTAsync_message** msg)`: https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/_m_q_t_t_async_8h.html#a9b45db63052fe29ab1fad22d2a00c91c -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1040845912 ## extensions/mqtt/processors/AbstractMQTTProcessor.cpp: ## @@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() { } logger_->log_info("Reconnecting to %s", uri_); - int ret = MQTTAsync_connect(client_, _opts); + if (MQTTAsync_isConnected(client_)) { +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); +return; + } + const int ret = MQTTAsync_connect(client_, _opts); + MQTTProperties_free(_props); if (ret != MQTTASYNC_SUCCESS) { -logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret); +logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret); +return; } + + // wait until connection succeeds or fails + connect_finished_task.get_future().get(); +} + +void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const { + conn_opts.cleanstart = getCleanStart(); + + { +MQTTProperty property; +property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL; +property.value.integer4 = gsl::narrow(getSessionExpiryInterval().count()); +MQTTProperties_add(_props, ); + } Review Comment: To me it looks like `property` is copied by value by `MQTTProperties_add()`: https://github.com/eclipse/paho.mqtt.c/blob/master/src/MQTTProperties.c#L127 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1039819121 ## libminifi/include/utils/Enum.h: ## @@ -39,6 +39,7 @@ namespace utils { constexpr Clazz(Type value = static_cast(-1)) : value_{value} {} \ explicit Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \ explicit Clazz(const char* str) : value_{parse(str).value_} {} \ +explicit Clazz(std::nullptr_t) = delete; \ Review Comment: Previously initializing with `0` was allowed, with the `const char*` constructor. This could be misleading, now it's not allowed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1039826380 ## PROCESSORS.md: ## @@ -2364,6 +2380,8 @@ In the list below, the names of required properties appear in bold. Any other pr ### Description Routes FlowFiles based on their Attributes using the Attribute Expression Language. +Any number of user-defined dynamic properties can be added, which all support the Attribute Expression Language. Relationships matching the name of the properties will be added. +FlowFiles will be routed to all the relationships whose matching property evaluates to "true". Unmatched FlowFiles will be routed for "unmatched" relationship, while failed ones to "failure". Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1039825741 ## PROCESSORS.md: ## @@ -337,28 +337,34 @@ This Processor gets the contents of a FlowFile from a MQTT broker for a specifie In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. -| Name | Default Value | Allowable Values | Description | -|---|---|--|-| -| **Broker URI**| | | The URI to use to connect to the MQTT broker | -| **Topic** | | | The topic to subscribe to | -| Client ID | | | MQTT client ID to use | -| Quality of Service| 0 | | The Quality of Service (QoS) to receive the message with. Accepts three values '0', '1' and '2' | -| Connection Timeout| 30 sec| | Maximum time interval the client will wait for the network connection to the MQTT broker | -| Keep Alive Interval | 60 sec| | Defines the maximum time interval between messages being sent to the broker | -| Max Flow Segment Size | | | Maximum flow content payload segment size for the MQTT record | -| Last Will Topic | | | The topic to send the client's Last Will to. If the Last Will topic is not set then a Last Will will not be sent| -| Last Will Message | | | The message to send as the client's Last Will. If the Last Will Message is empty, Last Will will be deleted from the broker | -| Last Will QoS | 0 | | The Quality of Service (QoS) to send the last will with. Accepts three values '0', '1' and '2' | -| Last Will Retain | false | | Whether to retain the client's Last Will | -| Security Protocol | | | Protocol used to communicate with brokers | -| Security CA | | | File or directory path to CA certificate(s) for verifying the broker's key | -| Security Cert | | | Path to client's public key (PEM) used for authentication | -| Security Private Key | | | Path to client's private key (PEM) used for authentication | -| Security Pass Phrase | | | Private key passphrase | -| Username | | | Username to use when connecting to the broker | -| Password | | | Password to use when connecting to the broker | -| Clean Session | true | | Whether to start afresh rather than remembering previous subscriptions | -| Queue Max Message | 1000 | | Maximum number of messages allowed on the received MQTT queue | +| Name| Default Value | Allowable Values| Description |
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1039823934 ## libminifi/include/utils/Enum.h: ## @@ -127,7 +128,7 @@ namespace utils { #define SMART_ENUM(Clazz, ...) \ struct Clazz { \ using Base = ::org::apache::nifi::minifi::utils::EnumBase; \ -enum Type { \ +enum Type : int { \ Review Comment: Since C++17, an enumeration can be initialized from an integer without a cast. For unscoped enums, only if underlying type is fixed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1039819121 ## libminifi/include/utils/Enum.h: ## @@ -39,6 +39,7 @@ namespace utils { constexpr Clazz(Type value = static_cast(-1)) : value_{value} {} \ explicit Clazz(const std::string& str) : value_{parse(str.c_str()).value_} {} \ explicit Clazz(const char* str) : value_{parse(str).value_} {} \ +explicit Clazz(std::nullptr_t) = delete; \ Review Comment: Previously initializing with `0` was allowed, with the `const char*` constructor. This could be misleading, not it's not allowed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1021602103 ## extensions/mqtt/processors/PublishMQTT.cpp: ## @@ -34,76 +34,267 @@ void PublishMQTT::initialize() { setSupportedRelationships(relationships()); } -void PublishMQTT::onSchedule(const std::shared_ptr , const std::shared_ptr ) { +void PublishMQTT::readProperties(const std::shared_ptr& context) { + if (!context->getProperty(Topic).has_value()) { +logger_->log_error("PublishMQTT: could not get Topic"); + } + if (const auto retain_opt = context->getProperty(Retain)) { retain_ = *retain_opt; } logger_->log_debug("PublishMQTT: Retain [%d]", retain_); - AbstractMQTTProcessor::onSchedule(context, factory); + if (const auto message_expiry_interval = context->getProperty(MessageExpiryInterval)) { +message_expiry_interval_ = std::chrono::duration_cast(message_expiry_interval->getMilliseconds()); +logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", int64_t{message_expiry_interval_->count()}); + } + + in_flight_message_counter_.setMqttVersion(mqtt_version_); + in_flight_message_counter_.setQoS(qos_); } -void PublishMQTT::onTrigger(const std::shared_ptr& /*context*/, const std::shared_ptr ) { - // reconnect if needed - reconnect(); +void PublishMQTT::onTriggerImpl(const std::shared_ptr& context, const std::shared_ptr ) { + std::shared_ptr flow_file = session->get(); - if (!MQTTAsync_isConnected(client_)) { -logger_->log_error("Could not publish to MQTT broker because disconnected to %s", uri_); -yield(); + if (!flow_file) { return; } - std::shared_ptr flowFile = session->get(); + // broker's Receive Maximum can change after reconnect + in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(65535)); - if (!flowFile) { -return; - } - - PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_, topic_, client_, gsl::narrow(qos_), retain_); - session->read(flowFile, std::ref(callback)); - if (callback.status_ < 0) { -logger_->log_error("Failed to send flow to MQTT topic %s", topic_); -session->transfer(flowFile, Failure); + const auto topic = getTopic(context, flow_file); + PublishMQTT::ReadCallback callback(this, flow_file, topic, getContentType(context, flow_file)); + session->read(flow_file, std::ref(callback)); + if (!callback.getSuccessStatus()) { +logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), topic, uri_); +session->transfer(flow_file, Failure); } else { -logger_->log_debug("Sent flow with length %d to MQTT topic %s", callback.read_size_, topic_); -session->transfer(flowFile, Success); +logger_->log_debug("Sent flow file [%s] with length %d to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), callback.getReadSize(), topic, uri_); +session->transfer(flow_file, Success); } } int64_t PublishMQTT::ReadCallback::operator()(const std::shared_ptr& stream) { - if (flow_size_ < max_seg_size_) -max_seg_size_ = flow_size_; - gsl_Expects(max_seg_size_ < gsl::narrow(std::numeric_limits::max())); - std::vector buffer(max_seg_size_); + if (flow_file_->getSize() > 268'435'455) { +processor_->logger_->log_error("Sending message failed because MQTT limit maximum packet size [268'435'455] is exceeded by FlowFile of [%" PRIu64 "]", flow_file_->getSize()); +success_status_ = false; +return -1; + } + + if (processor_->maximum_packet_size_.has_value() && flow_file_->getSize() > *(processor_->maximum_packet_size_)) { +processor_->logger_->log_error("Sending message failed because broker-requested maximum packet size [%" PRIu32 "] is exceeded by FlowFile of [%" PRIu64 "]", + *processor_->maximum_packet_size_, flow_file_->getSize()); +success_status_ = false; +return -1; + } + + std::vector buffer(flow_file_->getSize()); read_size_ = 0; - status_ = 0; - while (read_size_ < flow_size_) { -// MQTTClient_message::payloadlen is int, so we can't handle 2GB+ -const auto readRet = stream->read(buffer); + success_status_ = true; + while (read_size_ < flow_file_->getSize()) { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1021602511 ## extensions/mqtt/processors/PublishMQTT.cpp: ## @@ -34,76 +34,267 @@ void PublishMQTT::initialize() { setSupportedRelationships(relationships()); } -void PublishMQTT::onSchedule(const std::shared_ptr , const std::shared_ptr ) { +void PublishMQTT::readProperties(const std::shared_ptr& context) { + if (!context->getProperty(Topic).has_value()) { +logger_->log_error("PublishMQTT: could not get Topic"); + } + if (const auto retain_opt = context->getProperty(Retain)) { retain_ = *retain_opt; } logger_->log_debug("PublishMQTT: Retain [%d]", retain_); - AbstractMQTTProcessor::onSchedule(context, factory); + if (const auto message_expiry_interval = context->getProperty(MessageExpiryInterval)) { +message_expiry_interval_ = std::chrono::duration_cast(message_expiry_interval->getMilliseconds()); +logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", int64_t{message_expiry_interval_->count()}); + } + + in_flight_message_counter_.setMqttVersion(mqtt_version_); + in_flight_message_counter_.setQoS(qos_); } -void PublishMQTT::onTrigger(const std::shared_ptr& /*context*/, const std::shared_ptr ) { - // reconnect if needed - reconnect(); +void PublishMQTT::onTriggerImpl(const std::shared_ptr& context, const std::shared_ptr ) { + std::shared_ptr flow_file = session->get(); - if (!MQTTAsync_isConnected(client_)) { -logger_->log_error("Could not publish to MQTT broker because disconnected to %s", uri_); -yield(); + if (!flow_file) { return; } - std::shared_ptr flowFile = session->get(); + // broker's Receive Maximum can change after reconnect + in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM)); - if (!flowFile) { -return; - } - - PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_, topic_, client_, gsl::narrow(qos_), retain_); - session->read(flowFile, std::ref(callback)); - if (callback.status_ < 0) { -logger_->log_error("Failed to send flow to MQTT topic %s", topic_); -session->transfer(flowFile, Failure); + const auto topic = getTopic(context, flow_file); + PublishMQTT::ReadCallback callback(this, flow_file, topic, getContentType(context, flow_file)); + session->read(flow_file, std::ref(callback)); + if (!callback.getSuccessStatus()) { +logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), topic, uri_); Review Comment: Done, thanks for noticing it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1021156290 ## extensions/mqtt/processors/ConsumeMQTT.h: ## @@ -68,63 +77,82 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor { ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - void onSchedule(const std::shared_ptr , const std::shared_ptr ) override; - void onTrigger(const std::shared_ptr , const std::shared_ptr ) override; + static constexpr const char* const MQTT_TOPIC_ATTRIBUTE = "mqtt.topic"; + static constexpr const char* const MQTT_BROKER_ATTRIBUTE = "mqtt.broker"; + + void readProperties(const std::shared_ptr& context) override; + void onTriggerImpl(const std::shared_ptr& context, const std::shared_ptr& session) override; void initialize() override; private: - struct MQTTMessageDeleter { -void operator()(MQTTAsync_message* message) { - MQTTAsync_freeMessage(); + class WriteCallback { + public: +explicit WriteCallback(const SmartMessage& message) + : message_(message) { } - }; - void getReceivedMQTTMsg(std::deque>& msg_queue) { -std::unique_ptr message; -while (queue_.try_dequeue(message)) { - msg_queue.push_back(std::move(message)); +int64_t operator() (const std::shared_ptr& stream); + +[[nodiscard]] bool getSuccessStatus() const { + return success_status_; } - } - // MQTT async callback - static void subscriptionSuccess(void* context, MQTTAsync_successData* response) { -auto* processor = reinterpret_cast(context); -processor->onSubscriptionSuccess(response); - } + private: +const SmartMessage& message_; +bool success_status_ = true; + }; - // MQTT async callback - static void subscriptionFailure(void* context, MQTTAsync_failureData* response) { -auto* processor = reinterpret_cast(context); -processor->onSubscriptionFailure(response); - } + std::queue getReceivedMqttMessages(); + + // MQTT static async callbacks, calling their non-static counterparts with context being pointer to "this" + static void subscriptionSuccess(void* context, MQTTAsync_successData* response); + static void subscriptionSuccess5(void* context, MQTTAsync_successData5* response); + static void subscriptionFailure(void* context, MQTTAsync_failureData* response); + static void subscriptionFailure5(void* context, MQTTAsync_failureData5* response); - void onSubscriptionSuccess(MQTTAsync_successData* /*response*/) { -logger_->log_info("Successfully subscribed to MQTT topic %s on broker %s", topic_, uri_); + // MQTT non-static async callbacks + void onSubscriptionSuccess(); + void onSubscriptionFailure(MQTTAsync_failureData* response); + void onSubscriptionFailure5(MQTTAsync_failureData5* response); + void onMessageReceived(SmartMessage smart_message) override; + + void enqueueReceivedMQTTMsg(SmartMessage message); + void startupClient() override; + void checkProperties() override; + void checkBrokerLimitsImpl() override; + + void resolveTopicFromAlias(const std::unique_ptr& message, std::string& topic); + + bool getCleanSession() const override { +return clean_session_; } - void onSubscriptionFailure(MQTTAsync_failureData* response) { -logger_->log_error("Subscription failed on topic %s to MQTT broker %s (%d)", topic_, uri_, response->code); -if (response->message != nullptr) { - logger_->log_error("Detailed reason for subscription failure: %s", response->message); -} + bool getCleanStart() const override { +return clean_start_; } - bool getCleanSession() const override { -return cleanSession_; + std::chrono::seconds getSessionExpiryInterval() const override { +return session_expiry_interval_; } - void onMessageReceived(char* topic_name, int /*topic_len*/, MQTTAsync_message* message) override; + void putUserPropertiesAsAttributes(const SmartMessage& message, const std::shared_ptr& flow_file, const std::shared_ptr& session) const; + void fillAttributeFromContentType(const SmartMessage& message, const std::shared_ptr& flow_file, const std::shared_ptr& session) const; - void enqueueReceivedMQTTMsg(std::unique_ptr message); + void setMqtt5ConnectOptionsImpl(MQTTProperties& connect_props) const override; - bool startupClient() override; + std::string topic_; + bool clean_session_ = true; + bool clean_start_ = true; + std::chrono::seconds session_expiry_interval_{0}; + uint64_t max_queue_size_ = 1000; + std::string attribute_from_content_type_; - void checkProperties() override; + uint16_t topic_alias_maximum_{0}; + uint16_t receive_maximum_{65535}; Review Comment: I called it `MQTT_MAX_RECEIVE_MAXIMUM`, because this is the maximum possible value for the Receive Maximum MQTT property. This could either be defined by the client or the broker and it means the maximum number of unacknowledged messages they can tolerate at a time. -- This is an
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1021135374 ## extensions/mqtt/processors/PublishMQTT.cpp: ## @@ -34,76 +34,267 @@ void PublishMQTT::initialize() { setSupportedRelationships(relationships()); } -void PublishMQTT::onSchedule(const std::shared_ptr , const std::shared_ptr ) { +void PublishMQTT::readProperties(const std::shared_ptr& context) { + if (!context->getProperty(Topic).has_value()) { +logger_->log_error("PublishMQTT: could not get Topic"); + } + if (const auto retain_opt = context->getProperty(Retain)) { retain_ = *retain_opt; } logger_->log_debug("PublishMQTT: Retain [%d]", retain_); - AbstractMQTTProcessor::onSchedule(context, factory); + if (const auto message_expiry_interval = context->getProperty(MessageExpiryInterval)) { +message_expiry_interval_ = std::chrono::duration_cast(message_expiry_interval->getMilliseconds()); +logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", int64_t{message_expiry_interval_->count()}); + } + + in_flight_message_counter_.setMqttVersion(mqtt_version_); + in_flight_message_counter_.setQoS(qos_); } -void PublishMQTT::onTrigger(const std::shared_ptr& /*context*/, const std::shared_ptr ) { - // reconnect if needed - reconnect(); +void PublishMQTT::onTriggerImpl(const std::shared_ptr& context, const std::shared_ptr ) { + std::shared_ptr flow_file = session->get(); - if (!MQTTAsync_isConnected(client_)) { -logger_->log_error("Could not publish to MQTT broker because disconnected to %s", uri_); -yield(); + if (!flow_file) { return; } - std::shared_ptr flowFile = session->get(); + // broker's Receive Maximum can change after reconnect + in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(65535)); - if (!flowFile) { -return; - } - - PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_, topic_, client_, gsl::narrow(qos_), retain_); - session->read(flowFile, std::ref(callback)); - if (callback.status_ < 0) { -logger_->log_error("Failed to send flow to MQTT topic %s", topic_); -session->transfer(flowFile, Failure); + const auto topic = getTopic(context, flow_file); + PublishMQTT::ReadCallback callback(this, flow_file, topic, getContentType(context, flow_file)); + session->read(flow_file, std::ref(callback)); + if (!callback.getSuccessStatus()) { +logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), topic, uri_); +session->transfer(flow_file, Failure); } else { -logger_->log_debug("Sent flow with length %d to MQTT topic %s", callback.read_size_, topic_); -session->transfer(flowFile, Success); +logger_->log_debug("Sent flow file [%s] with length %d to MQTT topic '%s' on broker %s", flow_file->getUUIDStr(), callback.getReadSize(), topic, uri_); +session->transfer(flow_file, Success); } } int64_t PublishMQTT::ReadCallback::operator()(const std::shared_ptr& stream) { - if (flow_size_ < max_seg_size_) -max_seg_size_ = flow_size_; - gsl_Expects(max_seg_size_ < gsl::narrow(std::numeric_limits::max())); - std::vector buffer(max_seg_size_); + if (flow_file_->getSize() > 268'435'455) { +processor_->logger_->log_error("Sending message failed because MQTT limit maximum packet size [268'435'455] is exceeded by FlowFile of [%" PRIu64 "]", flow_file_->getSize()); +success_status_ = false; +return -1; + } + + if (processor_->maximum_packet_size_.has_value() && flow_file_->getSize() > *(processor_->maximum_packet_size_)) { +processor_->logger_->log_error("Sending message failed because broker-requested maximum packet size [%" PRIu32 "] is exceeded by FlowFile of [%" PRIu64 "]", + *processor_->maximum_packet_size_, flow_file_->getSize()); +success_status_ = false; +return -1; + } + + std::vector buffer(flow_file_->getSize()); read_size_ = 0; - status_ = 0; - while (read_size_ < flow_size_) { -// MQTTClient_message::payloadlen is int, so we can't handle 2GB+ -const auto readRet = stream->read(buffer); + success_status_ = true; + while (read_size_ < flow_file_->getSize()) { Review Comment: Is it guaranteed that `InputStream::read()` will read through all the data? This interface method has many implementations. If any of them works similarly to `read` system call, then it has to be read in a loop checking we got all data we wanted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1020323673 ## extensions/mqtt/processors/ConsumeMQTT.cpp: ## @@ -34,111 +34,325 @@ void ConsumeMQTT::initialize() { setSupportedRelationships(relationships()); } -void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) { - if (queue_.size_approx() >= maxQueueSize_) { -logger_->log_warn("MQTT queue full"); +void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) { + if (queue_.size_approx() >= max_queue_size_) { +logger_->log_error("MQTT queue full"); return; } - if (gsl::narrow(message->payloadlen) > max_seg_size_) { -logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen); -message->payloadlen = gsl::narrow(max_seg_size_); - } - - logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen); + logger_->log_debug("enqueuing MQTT message with length %d", message.contents->payloadlen); queue_.enqueue(std::move(message)); } -void ConsumeMQTT::onSchedule(const std::shared_ptr , const std::shared_ptr ) { +void ConsumeMQTT::readProperties(const std::shared_ptr& context) { + if (auto value = context->getProperty(Topic)) { +topic_ = std::move(*value); + } + logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_); + if (const auto value = context->getProperty(CleanSession)) { -cleanSession_ = *value; -logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_); +clean_session_ = *value; } + logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_); + + if (const auto value = context->getProperty(CleanStart)) { +clean_start_ = *value; + } + logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_); + + if (const auto session_expiry_interval = context->getProperty(SessionExpiryInterval)) { +session_expiry_interval_ = std::chrono::duration_cast(session_expiry_interval->getMilliseconds()); + } + logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", int64_t{session_expiry_interval_.count()}); if (const auto value = context->getProperty(QueueBufferMaxMessage)) { -maxQueueSize_ = *value; -logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_); +max_queue_size_ = *value; } + logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", max_queue_size_); - // this connects to broker, so properties of this processor must be read before - AbstractMQTTProcessor::onSchedule(context, factory); -} + if (auto value = context->getProperty(AttributeFromContentType)) { +attribute_from_content_type_ = std::move(*value); + } + logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", attribute_from_content_type_); -void ConsumeMQTT::onTrigger(const std::shared_ptr& /*context*/, const std::shared_ptr ) { - // reconnect if needed - reconnect(); + if (const auto topic_alias_maximum = context->getProperty(TopicAliasMaximum)) { +topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum); + } + logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", topic_alias_maximum_); - if (!MQTTAsync_isConnected(client_)) { -logger_->log_error("Could not consume from MQTT broker because disconnected to %s", uri_); -yield(); -return; + if (const auto receive_maximum = context->getProperty(ReceiveMaximum)) { +receive_maximum_ = gsl::narrow(*receive_maximum); } + logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", receive_maximum_); +} - std::deque> msg_queue; - getReceivedMQTTMsg(msg_queue); +void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& /*context*/, const std::shared_ptr& session) { + std::queue msg_queue = getReceivedMqttMessages(); while (!msg_queue.empty()) { const auto& message = msg_queue.front(); -std::shared_ptr processFlowFile = session->create(); -int write_status{}; -session->write(processFlowFile, [, _status](const std::shared_ptr& stream) -> int64_t { - if (message->payloadlen < 0) { -write_status = -1; -return -1; - } - const auto len = stream->write(reinterpret_cast(message->payload), gsl::narrow(message->payloadlen)); - if (io::isError(len)) { -write_status = -1; -return -1; - } - return gsl::narrow(len); -}); -if (write_status < 0) { - logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr()); - session->remove(processFlowFile); +std::shared_ptr flow_file = session->create(); +WriteCallback write_callback(message); +session->write(flow_file, write_callback); +if (!write_callback.getSuccessStatus()) { + logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", flow_file->getUUIDStr()); Review Comment: Good points, done both! -- This is an automated message
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1020313074 ## extensions/mqtt/processors/ConsumeMQTT.cpp: ## @@ -34,111 +34,325 @@ void ConsumeMQTT::initialize() { setSupportedRelationships(relationships()); } -void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) { - if (queue_.size_approx() >= maxQueueSize_) { -logger_->log_warn("MQTT queue full"); +void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) { + if (queue_.size_approx() >= max_queue_size_) { +logger_->log_error("MQTT queue full"); return; } - if (gsl::narrow(message->payloadlen) > max_seg_size_) { -logger_->log_debug("MQTT message was truncated while enqueuing, original length: %d", message->payloadlen); -message->payloadlen = gsl::narrow(max_seg_size_); - } - - logger_->log_debug("enqueuing MQTT message with length %d", message->payloadlen); + logger_->log_debug("enqueuing MQTT message with length %d", message.contents->payloadlen); queue_.enqueue(std::move(message)); } -void ConsumeMQTT::onSchedule(const std::shared_ptr , const std::shared_ptr ) { +void ConsumeMQTT::readProperties(const std::shared_ptr& context) { + if (auto value = context->getProperty(Topic)) { +topic_ = std::move(*value); + } + logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_); + if (const auto value = context->getProperty(CleanSession)) { -cleanSession_ = *value; -logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_); +clean_session_ = *value; } + logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_); + + if (const auto value = context->getProperty(CleanStart)) { +clean_start_ = *value; + } + logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_); + + if (const auto session_expiry_interval = context->getProperty(SessionExpiryInterval)) { +session_expiry_interval_ = std::chrono::duration_cast(session_expiry_interval->getMilliseconds()); + } + logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", int64_t{session_expiry_interval_.count()}); if (const auto value = context->getProperty(QueueBufferMaxMessage)) { -maxQueueSize_ = *value; -logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", maxQueueSize_); +max_queue_size_ = *value; } + logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", max_queue_size_); - // this connects to broker, so properties of this processor must be read before - AbstractMQTTProcessor::onSchedule(context, factory); -} + if (auto value = context->getProperty(AttributeFromContentType)) { +attribute_from_content_type_ = std::move(*value); + } + logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", attribute_from_content_type_); -void ConsumeMQTT::onTrigger(const std::shared_ptr& /*context*/, const std::shared_ptr ) { - // reconnect if needed - reconnect(); + if (const auto topic_alias_maximum = context->getProperty(TopicAliasMaximum)) { +topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum); + } + logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", topic_alias_maximum_); - if (!MQTTAsync_isConnected(client_)) { -logger_->log_error("Could not consume from MQTT broker because disconnected to %s", uri_); -yield(); -return; + if (const auto receive_maximum = context->getProperty(ReceiveMaximum)) { +receive_maximum_ = gsl::narrow(*receive_maximum); } + logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", receive_maximum_); +} - std::deque> msg_queue; - getReceivedMQTTMsg(msg_queue); +void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& /*context*/, const std::shared_ptr& session) { + std::queue msg_queue = getReceivedMqttMessages(); while (!msg_queue.empty()) { const auto& message = msg_queue.front(); -std::shared_ptr processFlowFile = session->create(); -int write_status{}; -session->write(processFlowFile, [, _status](const std::shared_ptr& stream) -> int64_t { - if (message->payloadlen < 0) { -write_status = -1; -return -1; - } - const auto len = stream->write(reinterpret_cast(message->payload), gsl::narrow(message->payloadlen)); - if (io::isError(len)) { -write_status = -1; -return -1; - } - return gsl::narrow(len); -}); -if (write_status < 0) { - logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", processFlowFile->getUUIDStr()); - session->remove(processFlowFile); +std::shared_ptr flow_file = session->create(); +WriteCallback write_callback(message); +session->write(flow_file, write_callback); +if (!write_callback.getSuccessStatus()) { + logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", flow_file->getUUIDStr()); + session->remove(flow_file); } else { -
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1020294046 ## extensions/mqtt/processors/ConsumeMQTT.h: ## @@ -35,27 +36,35 @@ namespace org::apache::nifi::minifi::processors { -#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic" -#define MQTT_BROKER_ATTRIBUTE "mqtt.broker" - class ConsumeMQTT : public processors::AbstractMQTTProcessor { public: explicit ConsumeMQTT(const std::string& name, const utils::Identifier& uuid = {}) : processors::AbstractMQTTProcessor(name, uuid) { -maxQueueSize_ = 100; } EXTENSIONAPI static constexpr const char* Description = "This Processor gets the contents of a FlowFile from a MQTT broker for a specified topic. " "The the payload of the MQTT message becomes content of a FlowFile"; + EXTENSIONAPI static const core::Property Topic; EXTENSIONAPI static const core::Property CleanSession; + EXTENSIONAPI static const core::Property CleanStart; + EXTENSIONAPI static const core::Property SessionExpiryInterval; EXTENSIONAPI static const core::Property QueueBufferMaxMessage; + EXTENSIONAPI static const core::Property AttributeFromContentType; + EXTENSIONAPI static const core::Property TopicAliasMaximum; + EXTENSIONAPI static const core::Property ReceiveMaximum; static auto properties() { -return utils::array_cat(AbstractMQTTProcessor::properties(), std::array{ +return utils::array_cat(AbstractMQTTProcessor::basicProperties(), std::array{ Review Comment: Yes, the order looks better, might make sense when on a web UI. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1020292687 ## extensions/mqtt/processors/ConsumeMQTT.h: ## @@ -68,63 +77,82 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor { ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - void onSchedule(const std::shared_ptr , const std::shared_ptr ) override; - void onTrigger(const std::shared_ptr , const std::shared_ptr ) override; + static constexpr const char* const MQTT_TOPIC_ATTRIBUTE = "mqtt.topic"; + static constexpr const char* const MQTT_BROKER_ATTRIBUTE = "mqtt.broker"; + + void readProperties(const std::shared_ptr& context) override; + void onTriggerImpl(const std::shared_ptr& context, const std::shared_ptr& session) override; void initialize() override; private: - struct MQTTMessageDeleter { -void operator()(MQTTAsync_message* message) { - MQTTAsync_freeMessage(); + class WriteCallback { + public: +explicit WriteCallback(const SmartMessage& message) + : message_(message) { } - }; - void getReceivedMQTTMsg(std::deque>& msg_queue) { -std::unique_ptr message; -while (queue_.try_dequeue(message)) { - msg_queue.push_back(std::move(message)); +int64_t operator() (const std::shared_ptr& stream); + +[[nodiscard]] bool getSuccessStatus() const { + return success_status_; } - } - // MQTT async callback - static void subscriptionSuccess(void* context, MQTTAsync_successData* response) { -auto* processor = reinterpret_cast(context); -processor->onSubscriptionSuccess(response); - } + private: +const SmartMessage& message_; +bool success_status_ = true; + }; - // MQTT async callback - static void subscriptionFailure(void* context, MQTTAsync_failureData* response) { -auto* processor = reinterpret_cast(context); -processor->onSubscriptionFailure(response); - } + std::queue getReceivedMqttMessages(); + + // MQTT static async callbacks, calling their non-static counterparts with context being pointer to "this" + static void subscriptionSuccess(void* context, MQTTAsync_successData* response); + static void subscriptionSuccess5(void* context, MQTTAsync_successData5* response); + static void subscriptionFailure(void* context, MQTTAsync_failureData* response); + static void subscriptionFailure5(void* context, MQTTAsync_failureData5* response); - void onSubscriptionSuccess(MQTTAsync_successData* /*response*/) { -logger_->log_info("Successfully subscribed to MQTT topic %s on broker %s", topic_, uri_); + // MQTT non-static async callbacks + void onSubscriptionSuccess(); + void onSubscriptionFailure(MQTTAsync_failureData* response); + void onSubscriptionFailure5(MQTTAsync_failureData5* response); + void onMessageReceived(SmartMessage smart_message) override; + + void enqueueReceivedMQTTMsg(SmartMessage message); + void startupClient() override; + void checkProperties() override; + void checkBrokerLimitsImpl() override; + + void resolveTopicFromAlias(const std::unique_ptr& message, std::string& topic); + + bool getCleanSession() const override { +return clean_session_; } - void onSubscriptionFailure(MQTTAsync_failureData* response) { -logger_->log_error("Subscription failed on topic %s to MQTT broker %s (%d)", topic_, uri_, response->code); -if (response->message != nullptr) { - logger_->log_error("Detailed reason for subscription failure: %s", response->message); -} + bool getCleanStart() const override { +return clean_start_; } - bool getCleanSession() const override { -return cleanSession_; + std::chrono::seconds getSessionExpiryInterval() const override { +return session_expiry_interval_; } - void onMessageReceived(char* topic_name, int /*topic_len*/, MQTTAsync_message* message) override; + void putUserPropertiesAsAttributes(const SmartMessage& message, const std::shared_ptr& flow_file, const std::shared_ptr& session) const; + void fillAttributeFromContentType(const SmartMessage& message, const std::shared_ptr& flow_file, const std::shared_ptr& session) const; - void enqueueReceivedMQTTMsg(std::unique_ptr message); + void setMqtt5ConnectOptionsImpl(MQTTProperties& connect_props) const override; - bool startupClient() override; + std::string topic_; + bool clean_session_ = true; + bool clean_start_ = true; + std::chrono::seconds session_expiry_interval_{0}; + uint64_t max_queue_size_ = 1000; + std::string attribute_from_content_type_; - void checkProperties() override; + uint16_t topic_alias_maximum_{0}; + uint16_t receive_maximum_{65535}; Review Comment: It's match, however this is not defined by C++, but MQTT. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1019320537 ## extensions/mqtt/processors/ConsumeMQTT.h: ## @@ -125,14 +119,14 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor { void onSubscriptionSuccess(); void onSubscriptionFailure(MQTTAsync_failureData* response); void onSubscriptionFailure5(MQTTAsync_failureData5* response); - void onMessageReceived(char* topic_name, int /*topic_len*/, MQTTAsync_message* message) override; + void onMessageReceived(std::string topic, std::unique_ptr message) override; Review Comment: Moved struct definition to `AbstractMQTTProcessor`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1019200246 ## extensions/mqtt/processors/AbstractMQTTProcessor.cpp: ## @@ -163,23 +200,261 @@ void AbstractMQTTProcessor::reconnect() { } logger_->log_info("Reconnecting to %s", uri_); - int ret = MQTTAsync_connect(client_, _opts); + if (MQTTAsync_isConnected(client_)) { +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); +return; + } + const int ret = MQTTAsync_connect(client_, _opts); + MQTTProperties_free(_props); if (ret != MQTTASYNC_SUCCESS) { -logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret); +logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret); +return; } + + // wait until connection succeeds or fails + connect_finished_task.get_future().get(); +} + +void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const { + conn_opts.cleanstart = getCleanStart(); + + { +MQTTProperty property; +property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL; +property.value.integer4 = gsl::narrow(getSessionExpiryInterval().count()); +MQTTProperties_add(_props, ); + } + + if (!last_will_content_type_.empty()) { +MQTTProperty property; +property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE; +property.value.data.len = last_will_content_type_.length(); +property.value.data.data = const_cast(last_will_content_type_.data()); +MQTTProperties_add(_props, ); + } + + conn_opts.willProperties = _props; + + setMqtt5ConnectOptionsImpl(connect_props); +} + +void AbstractMQTTProcessor::onTrigger(const std::shared_ptr& context, const std::shared_ptr& session) { + // read lock + std::shared_lock client_lock{client_mutex_}; + if (client_ == nullptr) { +// we are shutting down +return; + } + + // reconnect if needed + reconnect(); + + if (!MQTTAsync_isConnected(client_)) { +logger_->log_error("Could not work with MQTT broker because disconnected to %s", uri_); +yield(); +return; + } + + onTriggerImpl(context, session); } void AbstractMQTTProcessor::freeResources() { - if (client_ && MQTTAsync_isConnected(client_)) { -MQTTAsync_disconnectOptions disconnect_options = MQTTAsync_disconnectOptions_initializer; -disconnect_options.context = this; -disconnect_options.onSuccess = disconnectionSuccess; -disconnect_options.onFailure = disconnectionFailure; -disconnect_options.timeout = gsl::narrow(std::chrono::milliseconds{connection_timeout_}.count()); -MQTTAsync_disconnect(client_, _options); + // write lock + std::lock_guard client_lock{client_mutex_}; + + if (!client_) { +return; } - if (client_) { -MQTTAsync_destroy(_); + + disconnect(); + + MQTTAsync_destroy(_); +} + +void AbstractMQTTProcessor::disconnect() { + if (!MQTTAsync_isConnected(client_)) { +return; + } + + MQTTAsync_disconnectOptions disconnect_options = MQTTAsync_disconnectOptions_initializer; + std::packaged_task disconnect_finished_task( + [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) { +onDisconnectFinished(success_data, success_data_5, failure_data, failure_data_5); + }); + disconnect_options.context = _finished_task; + + if (mqtt_version_.value() == MqttVersions::V_5_0) { +disconnect_options.onSuccess5 = connectionSuccess5; +disconnect_options.onFailure5 = connectionFailure5; + } else { +disconnect_options.onSuccess = connectionSuccess; +disconnect_options.onFailure = connectionFailure; + } + + disconnect_options.timeout = gsl::narrow(std::chrono::milliseconds{connection_timeout_}.count()); + + const int ret = MQTTAsync_disconnect(client_, _options); + if (ret != MQTTASYNC_SUCCESS) { +logger_->log_error("MQTTAsync_disconnect failed to MQTT broker %s with error code [%d]", uri_, ret); +return; + } + + // wait until connection succeeds or fails + disconnect_finished_task.get_future().get(); +} + +void AbstractMQTTProcessor::setBrokerLimits(MQTTAsync_successData5* response) { + auto readProperty = [response] (MQTTPropertyCodes property_code, auto& out_var) { +// defined by Paho MQTT C library +static const int failure_code = -999; + +const int value = MQTTProperties_getNumericValue(>properties, property_code); +if (value != failure_code) { + if constexpr (std::is_same_v&>) { +out_var = std::chrono::seconds(value); + } else { +out_var = gsl::narrow::type::value_type>(value); Review Comment: Yes, we can! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1019199181 ## extensions/mqtt/processors/AbstractMQTTProcessor.cpp: ## @@ -163,23 +200,261 @@ void AbstractMQTTProcessor::reconnect() { } logger_->log_info("Reconnecting to %s", uri_); - int ret = MQTTAsync_connect(client_, _opts); + if (MQTTAsync_isConnected(client_)) { +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); +return; + } + const int ret = MQTTAsync_connect(client_, _opts); + MQTTProperties_free(_props); if (ret != MQTTASYNC_SUCCESS) { -logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret); +logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret); +return; } + + // wait until connection succeeds or fails + connect_finished_task.get_future().get(); +} + +void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const { + conn_opts.cleanstart = getCleanStart(); + + { +MQTTProperty property; +property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL; +property.value.integer4 = gsl::narrow(getSessionExpiryInterval().count()); +MQTTProperties_add(_props, ); + } + + if (!last_will_content_type_.empty()) { +MQTTProperty property; +property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE; +property.value.data.len = last_will_content_type_.length(); +property.value.data.data = const_cast(last_will_content_type_.data()); +MQTTProperties_add(_props, ); + } + + conn_opts.willProperties = _props; + + setMqtt5ConnectOptionsImpl(connect_props); +} + +void AbstractMQTTProcessor::onTrigger(const std::shared_ptr& context, const std::shared_ptr& session) { + // read lock + std::shared_lock client_lock{client_mutex_}; + if (client_ == nullptr) { +// we are shutting down +return; + } + + // reconnect if needed + reconnect(); + + if (!MQTTAsync_isConnected(client_)) { +logger_->log_error("Could not work with MQTT broker because disconnected to %s", uri_); +yield(); +return; + } + + onTriggerImpl(context, session); } void AbstractMQTTProcessor::freeResources() { - if (client_ && MQTTAsync_isConnected(client_)) { -MQTTAsync_disconnectOptions disconnect_options = MQTTAsync_disconnectOptions_initializer; -disconnect_options.context = this; -disconnect_options.onSuccess = disconnectionSuccess; -disconnect_options.onFailure = disconnectionFailure; -disconnect_options.timeout = gsl::narrow(std::chrono::milliseconds{connection_timeout_}.count()); -MQTTAsync_disconnect(client_, _options); + // write lock + std::lock_guard client_lock{client_mutex_}; + + if (!client_) { +return; } - if (client_) { -MQTTAsync_destroy(_); + + disconnect(); + + MQTTAsync_destroy(_); +} + +void AbstractMQTTProcessor::disconnect() { + if (!MQTTAsync_isConnected(client_)) { +return; + } + + MQTTAsync_disconnectOptions disconnect_options = MQTTAsync_disconnectOptions_initializer; + std::packaged_task disconnect_finished_task( + [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) { +onDisconnectFinished(success_data, success_data_5, failure_data, failure_data_5); + }); + disconnect_options.context = _finished_task; + + if (mqtt_version_.value() == MqttVersions::V_5_0) { +disconnect_options.onSuccess5 = connectionSuccess5; +disconnect_options.onFailure5 = connectionFailure5; + } else { +disconnect_options.onSuccess = connectionSuccess; +disconnect_options.onFailure = connectionFailure; + } + + disconnect_options.timeout = gsl::narrow(std::chrono::milliseconds{connection_timeout_}.count()); + + const int ret = MQTTAsync_disconnect(client_, _options); + if (ret != MQTTASYNC_SUCCESS) { +logger_->log_error("MQTTAsync_disconnect failed to MQTT broker %s with error code [%d]", uri_, ret); +return; + } + + // wait until connection succeeds or fails + disconnect_finished_task.get_future().get(); +} + +void AbstractMQTTProcessor::setBrokerLimits(MQTTAsync_successData5* response) { + auto readProperty = [response] (MQTTPropertyCodes property_code, auto& out_var) { +// defined by Paho MQTT C library +static const int failure_code = -999; Review Comment: Thanks, did it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1019193763 ## extensions/mqtt/processors/AbstractMQTTProcessor.cpp: ## @@ -137,19 +148,45 @@ void AbstractMQTTProcessor::onSchedule(const std::shared_ptrlog_error("MQTT client is not existing while trying to reconnect"); -return; +throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT client is not existing while trying to reconnect"); } if (MQTTAsync_isConnected(client_)) { -logger_->log_info("Already connected to %s, no need to reconnect", uri_); +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); return; } - MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; + + MQTTAsync_connectOptions conn_opts; + MQTTProperties connect_props = MQTTProperties_initializer; + MQTTProperties will_props = MQTTProperties_initializer; + + if (mqtt_version_.value() == MqttVersions::V_5_0) { +conn_opts = MQTTAsync_connectOptions_initializer5; +conn_opts.onSuccess5 = connectionSuccess5; +conn_opts.onFailure5 = connectionFailure5; +conn_opts.connectProperties = _props; + } else { +conn_opts = MQTTAsync_connectOptions_initializer; +conn_opts.onSuccess = connectionSuccess; +conn_opts.onFailure = connectionFailure; + } + + if (mqtt_version_.value() == MqttVersions::V_3_1_0) { +conn_opts.MQTTVersion = MQTTVERSION_3_1; + } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) { +conn_opts.MQTTVersion = MQTTVERSION_3_1_1; Review Comment: `MQTTAsync_connectOptions_initializer5` does it on line 163. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1019193763 ## extensions/mqtt/processors/AbstractMQTTProcessor.cpp: ## @@ -137,19 +148,45 @@ void AbstractMQTTProcessor::onSchedule(const std::shared_ptrlog_error("MQTT client is not existing while trying to reconnect"); -return; +throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT client is not existing while trying to reconnect"); } if (MQTTAsync_isConnected(client_)) { -logger_->log_info("Already connected to %s, no need to reconnect", uri_); +logger_->log_debug("Already connected to %s, no need to reconnect", uri_); return; } - MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; + + MQTTAsync_connectOptions conn_opts; + MQTTProperties connect_props = MQTTProperties_initializer; + MQTTProperties will_props = MQTTProperties_initializer; + + if (mqtt_version_.value() == MqttVersions::V_5_0) { +conn_opts = MQTTAsync_connectOptions_initializer5; +conn_opts.onSuccess5 = connectionSuccess5; +conn_opts.onFailure5 = connectionFailure5; +conn_opts.connectProperties = _props; + } else { +conn_opts = MQTTAsync_connectOptions_initializer; +conn_opts.onSuccess = connectionSuccess; +conn_opts.onFailure = connectionFailure; + } + + if (mqtt_version_.value() == MqttVersions::V_3_1_0) { +conn_opts.MQTTVersion = MQTTVERSION_3_1; + } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) { +conn_opts.MQTTVersion = MQTTVERSION_3_1_1; Review Comment: MQTTAsync_connectOptions_initializer5 does it on line 163. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1019185396 ## extensions/mqtt/processors/AbstractMQTTProcessor.cpp: ## @@ -20,51 +20,49 @@ #include #include "utils/StringUtils.h" +#include "utils/ProcessorConfigUtils.h" #include "core/ProcessContext.h" namespace org::apache::nifi::minifi::processors { void AbstractMQTTProcessor::onSchedule(const std::shared_ptr , const std::shared_ptr& /*factory*/) { if (auto value = context->getProperty(BrokerURI)) { uri_ = std::move(*value); -logger_->log_debug("AbstractMQTTProcessor: BrokerURI [%s]", uri_); } + logger_->log_debug("AbstractMQTTProcessor: BrokerURI [%s]", uri_); + + mqtt_version_ = MqttVersions{utils::parsePropertyWithAllowableValuesOrThrow(*context, MqttVersion.getName(), MqttVersions::values())}; Review Comment: Thanks, used it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5
adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1019170650 ## extensions/librdkafka/PublishKafka.cpp: ## @@ -266,8 +266,8 @@ class ReadCallback { status_ = 0; called_ = true; -gsl_Expects(max_seg_size_ != 0 || (flow_size_ == 0 && "max_seg_size_ == 0 implies flow_size_ == 0")); -// ^^ therefore checking max_seg_size_ == 0 handles both division by zero and flow_size_ == 0 cases +gsl_Expects(max_seg_size_ != 0 || (flow_size_ == 0 && "max_message_size_ == 0 implies flow_size_ == 0")); +// ^^ therefore checking max_message_size_ == 0 handles both division by zero and flow_size_ == 0 cases Review Comment: I think it was. I used CLion's rename feature and it affected this file by accident. Restoring. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org