adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1071202236
########## extensions/mqtt/processors/AbstractMQTTProcessor.cpp: ########## @@ -137,49 +148,317 @@ void AbstractMQTTProcessor::onSchedule(const std::shared_ptr<core::ProcessContex void AbstractMQTTProcessor::reconnect() { if (!client_) { - logger_->log_error("MQTT client is not existing while trying to reconnect"); + throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT client is not existing while trying to reconnect"); + } + if (MQTTAsync_isConnected(client_)) { + logger_->log_debug("Already connected to %s, no need to reconnect", uri_); return; } + + MQTTAsync_connectOptions connect_options; + MQTTProperties connect_properties = MQTTProperties_initializer; + MQTTProperties will_properties = MQTTProperties_initializer; + + ConnectFinishedTask connect_finished_task( + [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* failure_data_5) { + onConnectFinished(success_data, success_data_5, failure_data, failure_data_5); + }); + + setConnectOptions(connect_options, connect_properties, will_properties, connect_finished_task); + + logger_->log_info("Reconnecting to %s", uri_); if (MQTTAsync_isConnected(client_)) { - logger_->log_info("Already connected to %s, no need to reconnect", uri_); + logger_->log_debug("Already connected to %s, no need to reconnect", uri_); + return; + } + const int ret = MQTTAsync_connect(client_, &connect_options); + MQTTProperties_free(&connect_properties); + if (ret != MQTTASYNC_SUCCESS) { + logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error code [%d]", uri_, ret); return; } - MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; - conn_opts.keepAliveInterval = gsl::narrow<int>(keep_alive_interval_.count()); - conn_opts.cleansession = getCleanSession(); - conn_opts.context = this; - conn_opts.onSuccess = connectionSuccess; - conn_opts.onFailure = connectionFailure; - conn_opts.connectTimeout = gsl::narrow<int>(connection_timeout_.count()); + + // wait until connection succeeds or fails + connect_finished_task.get_future().get(); +} + +void AbstractMQTTProcessor::setConnectOptions(MQTTAsync_connectOptions& connect_options, MQTTProperties& connect_properties, + MQTTProperties& will_properties, const ConnectFinishedTask& connect_finished_task) const { + if (mqtt_version_.value() == MqttVersions::V_5_0) { + setMqtt5ConnectOptions(connect_options, connect_properties, will_properties); + } else { + setMqtt3ConnectOptions(connect_options); + } + + connect_options.context = const_cast<ConnectFinishedTask*>(&connect_finished_task); + connect_options.connectTimeout = gsl::narrow<int>(connection_timeout_.count()); + connect_options.keepAliveInterval = gsl::narrow<int>(keep_alive_interval_.count()); if (!username_.empty()) { - conn_opts.username = username_.c_str(); - conn_opts.password = password_.c_str(); + connect_options.username = username_.c_str(); + connect_options.password = password_.c_str(); } if (sslOpts_) { - conn_opts.ssl = &*sslOpts_; + connect_options.ssl = const_cast<MQTTAsync_SSLOptions*>(&*sslOpts_); } if (last_will_) { - conn_opts.will = &*last_will_; + connect_options.will = const_cast<MQTTAsync_willOptions*>(&*last_will_); } +} - logger_->log_info("Reconnecting to %s", uri_); - int ret = MQTTAsync_connect(client_, &conn_opts); - if (ret != MQTTASYNC_SUCCESS) { - logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, ret); +void AbstractMQTTProcessor::setMqtt3ConnectOptions(MQTTAsync_connectOptions& connect_options) const { + connect_options = MQTTAsync_connectOptions_initializer; + connect_options.onSuccess = connectionSuccess; + connect_options.onFailure = connectionFailure; + connect_options.cleansession = getCleanSession(); + + if (mqtt_version_.value() == MqttVersions::V_3_1_0) { + connect_options.MQTTVersion = MQTTVERSION_3_1; + } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) { + connect_options.MQTTVersion = MQTTVERSION_3_1_1; + } +} + +void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& connect_options, MQTTProperties& connect_properties, MQTTProperties& will_properties) const { + connect_options = MQTTAsync_connectOptions_initializer5; + connect_options.onSuccess5 = connectionSuccess5; + connect_options.onFailure5 = connectionFailure5; + connect_options.connectProperties = &connect_properties; + + connect_options.cleanstart = getCleanStart(); + + { + MQTTProperty property; + property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL; + property.value.integer4 = gsl::narrow<int>(getSessionExpiryInterval().count()); Review Comment: That does not, but MQTT 5 specs does: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048 "Followed by the Four Byte Integer representing the Session Expiry Interval in seconds." But maybe it should be `unsigned` instead of `signed int`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org