szaszm commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1086598334


##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -137,49 +148,317 @@ void AbstractMQTTProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContex
 
 void AbstractMQTTProcessor::reconnect() {
   if (!client_) {
-    logger_->log_error("MQTT client is not existing while trying to 
reconnect");
+    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT 
client is not existing while trying to reconnect");
+  }
+  if (MQTTAsync_isConnected(client_)) {
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
     return;
   }
+
+  MQTTAsync_connectOptions connect_options;
+  MQTTProperties connect_properties = MQTTProperties_initializer;
+  MQTTProperties will_properties = MQTTProperties_initializer;
+
+  ConnectFinishedTask connect_finished_task(
+          [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* 
success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* 
failure_data_5) {
+            onConnectFinished(success_data, success_data_5, failure_data, 
failure_data_5);
+          });
+
+  setConnectOptions(connect_options, connect_properties, will_properties, 
connect_finished_task);
+
+  logger_->log_info("Reconnecting to %s", uri_);
   if (MQTTAsync_isConnected(client_)) {
-    logger_->log_info("Already connected to %s, no need to reconnect", uri_);
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+    return;
+  }
+  const int ret = MQTTAsync_connect(client_, &connect_options);
+  MQTTProperties_free(&connect_properties);
+  if (ret != MQTTASYNC_SUCCESS) {
+    logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
     return;
   }
-  MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
-  conn_opts.keepAliveInterval = gsl::narrow<int>(keep_alive_interval_.count());
-  conn_opts.cleansession = getCleanSession();
-  conn_opts.context = this;
-  conn_opts.onSuccess = connectionSuccess;
-  conn_opts.onFailure = connectionFailure;
-  conn_opts.connectTimeout = gsl::narrow<int>(connection_timeout_.count());
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setConnectOptions(MQTTAsync_connectOptions& 
connect_options, MQTTProperties& connect_properties,
+                                              MQTTProperties& will_properties, 
const ConnectFinishedTask& connect_finished_task) const {
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+    setMqtt5ConnectOptions(connect_options, connect_properties, 
will_properties);
+  } else {
+    setMqtt3ConnectOptions(connect_options);
+  }
+
+  connect_options.context = 
const_cast<ConnectFinishedTask*>(&connect_finished_task);
+  connect_options.connectTimeout = 
gsl::narrow<int>(connection_timeout_.count());
+  connect_options.keepAliveInterval = 
gsl::narrow<int>(keep_alive_interval_.count());
   if (!username_.empty()) {
-    conn_opts.username = username_.c_str();
-    conn_opts.password = password_.c_str();
+    connect_options.username = username_.c_str();
+    connect_options.password = password_.c_str();
   }
   if (sslOpts_) {
-    conn_opts.ssl = &*sslOpts_;
+    connect_options.ssl = const_cast<MQTTAsync_SSLOptions*>(&*sslOpts_);
   }
   if (last_will_) {
-    conn_opts.will = &*last_will_;
+    connect_options.will = const_cast<MQTTAsync_willOptions*>(&*last_will_);

Review Comment:
   It seems to me that we can get rid of the `const_cast`s by making this 
function non-`const` and taking `connect_finished_task` by mutable reference. 
`const` doesn't guarantee any safety anyway if it's casted away at the usage 
site.
   One could argue that it promises that the function will not modify the class 
state, but then I would cast the `const` back on `connect_finished_task_` in 
the connection success/failure callbacks. And for the data members, we need to 
rely on a promise that the API doesn't write to these options.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to