szaszm commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1066925909


##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -137,49 +148,317 @@ void AbstractMQTTProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContex
 
 void AbstractMQTTProcessor::reconnect() {
   if (!client_) {
-    logger_->log_error("MQTT client is not existing while trying to 
reconnect");
+    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT 
client is not existing while trying to reconnect");
+  }
+  if (MQTTAsync_isConnected(client_)) {
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
     return;
   }
+
+  MQTTAsync_connectOptions connect_options;
+  MQTTProperties connect_properties = MQTTProperties_initializer;
+  MQTTProperties will_properties = MQTTProperties_initializer;
+
+  ConnectFinishedTask connect_finished_task(
+          [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* 
success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* 
failure_data_5) {
+            onConnectFinished(success_data, success_data_5, failure_data, 
failure_data_5);
+          });
+
+  setConnectOptions(connect_options, connect_properties, will_properties, 
connect_finished_task);
+
+  logger_->log_info("Reconnecting to %s", uri_);
   if (MQTTAsync_isConnected(client_)) {
-    logger_->log_info("Already connected to %s, no need to reconnect", uri_);
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+    return;
+  }
+  const int ret = MQTTAsync_connect(client_, &connect_options);
+  MQTTProperties_free(&connect_properties);
+  if (ret != MQTTASYNC_SUCCESS) {
+    logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
     return;
   }
-  MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
-  conn_opts.keepAliveInterval = gsl::narrow<int>(keep_alive_interval_.count());
-  conn_opts.cleansession = getCleanSession();
-  conn_opts.context = this;
-  conn_opts.onSuccess = connectionSuccess;
-  conn_opts.onFailure = connectionFailure;
-  conn_opts.connectTimeout = gsl::narrow<int>(connection_timeout_.count());
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setConnectOptions(MQTTAsync_connectOptions& 
connect_options, MQTTProperties& connect_properties,
+                                              MQTTProperties& will_properties, 
const ConnectFinishedTask& connect_finished_task) const {
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+    setMqtt5ConnectOptions(connect_options, connect_properties, 
will_properties);
+  } else {
+    setMqtt3ConnectOptions(connect_options);
+  }
+
+  connect_options.context = 
const_cast<ConnectFinishedTask*>(&connect_finished_task);
+  connect_options.connectTimeout = 
gsl::narrow<int>(connection_timeout_.count());
+  connect_options.keepAliveInterval = 
gsl::narrow<int>(keep_alive_interval_.count());
   if (!username_.empty()) {
-    conn_opts.username = username_.c_str();
-    conn_opts.password = password_.c_str();
+    connect_options.username = username_.c_str();
+    connect_options.password = password_.c_str();
   }
   if (sslOpts_) {
-    conn_opts.ssl = &*sslOpts_;
+    connect_options.ssl = const_cast<MQTTAsync_SSLOptions*>(&*sslOpts_);
   }
   if (last_will_) {
-    conn_opts.will = &*last_will_;
+    connect_options.will = const_cast<MQTTAsync_willOptions*>(&*last_will_);
   }
+}
 
-  logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, &conn_opts);
-  if (ret != MQTTASYNC_SUCCESS) {
-    logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, 
ret);
+void AbstractMQTTProcessor::setMqtt3ConnectOptions(MQTTAsync_connectOptions& 
connect_options) const {
+  connect_options = MQTTAsync_connectOptions_initializer;
+  connect_options.onSuccess = connectionSuccess;
+  connect_options.onFailure = connectionFailure;
+  connect_options.cleansession = getCleanSession();
+
+  if (mqtt_version_.value() == MqttVersions::V_3_1_0) {
+    connect_options.MQTTVersion = MQTTVERSION_3_1;
+  } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) {
+    connect_options.MQTTVersion = MQTTVERSION_3_1_1;
+  }
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& 
connect_options, MQTTProperties& connect_properties, MQTTProperties& 
will_properties) const {
+  connect_options = MQTTAsync_connectOptions_initializer5;
+  connect_options.onSuccess5 = connectionSuccess5;
+  connect_options.onFailure5 = connectionFailure5;
+  connect_options.connectProperties = &connect_properties;
+
+  connect_options.cleanstart = getCleanStart();
+
+  {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+    property.value.integer4 = 
gsl::narrow<int>(getSessionExpiryInterval().count());

Review Comment:
   I think we should verify the type of this property. The pahq mqtt c docs 
don't document the type of each property, but provide a function to query it: 
[MQTTProperty_getType](https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/_m_q_t_t_properties_8h.html#a7d30ad0520bc9b9366e700d4b493b173)



##########
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##########
@@ -137,49 +148,317 @@ void AbstractMQTTProcessor::onSchedule(const 
std::shared_ptr<core::ProcessContex
 
 void AbstractMQTTProcessor::reconnect() {
   if (!client_) {
-    logger_->log_error("MQTT client is not existing while trying to 
reconnect");
+    throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT 
client is not existing while trying to reconnect");
+  }
+  if (MQTTAsync_isConnected(client_)) {
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
     return;
   }
+
+  MQTTAsync_connectOptions connect_options;
+  MQTTProperties connect_properties = MQTTProperties_initializer;
+  MQTTProperties will_properties = MQTTProperties_initializer;
+
+  ConnectFinishedTask connect_finished_task(
+          [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* 
success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* 
failure_data_5) {
+            onConnectFinished(success_data, success_data_5, failure_data, 
failure_data_5);
+          });
+
+  setConnectOptions(connect_options, connect_properties, will_properties, 
connect_finished_task);
+
+  logger_->log_info("Reconnecting to %s", uri_);
   if (MQTTAsync_isConnected(client_)) {
-    logger_->log_info("Already connected to %s, no need to reconnect", uri_);
+    logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+    return;
+  }
+  const int ret = MQTTAsync_connect(client_, &connect_options);
+  MQTTProperties_free(&connect_properties);
+  if (ret != MQTTASYNC_SUCCESS) {
+    logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
     return;
   }
-  MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
-  conn_opts.keepAliveInterval = gsl::narrow<int>(keep_alive_interval_.count());
-  conn_opts.cleansession = getCleanSession();
-  conn_opts.context = this;
-  conn_opts.onSuccess = connectionSuccess;
-  conn_opts.onFailure = connectionFailure;
-  conn_opts.connectTimeout = gsl::narrow<int>(connection_timeout_.count());
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setConnectOptions(MQTTAsync_connectOptions& 
connect_options, MQTTProperties& connect_properties,
+                                              MQTTProperties& will_properties, 
const ConnectFinishedTask& connect_finished_task) const {
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+    setMqtt5ConnectOptions(connect_options, connect_properties, 
will_properties);
+  } else {
+    setMqtt3ConnectOptions(connect_options);
+  }
+
+  connect_options.context = 
const_cast<ConnectFinishedTask*>(&connect_finished_task);
+  connect_options.connectTimeout = 
gsl::narrow<int>(connection_timeout_.count());
+  connect_options.keepAliveInterval = 
gsl::narrow<int>(keep_alive_interval_.count());
   if (!username_.empty()) {
-    conn_opts.username = username_.c_str();
-    conn_opts.password = password_.c_str();
+    connect_options.username = username_.c_str();
+    connect_options.password = password_.c_str();
   }
   if (sslOpts_) {
-    conn_opts.ssl = &*sslOpts_;
+    connect_options.ssl = const_cast<MQTTAsync_SSLOptions*>(&*sslOpts_);
   }
   if (last_will_) {
-    conn_opts.will = &*last_will_;
+    connect_options.will = const_cast<MQTTAsync_willOptions*>(&*last_will_);
   }
+}
 
-  logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, &conn_opts);
-  if (ret != MQTTASYNC_SUCCESS) {
-    logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, 
ret);
+void AbstractMQTTProcessor::setMqtt3ConnectOptions(MQTTAsync_connectOptions& 
connect_options) const {
+  connect_options = MQTTAsync_connectOptions_initializer;
+  connect_options.onSuccess = connectionSuccess;
+  connect_options.onFailure = connectionFailure;
+  connect_options.cleansession = getCleanSession();
+
+  if (mqtt_version_.value() == MqttVersions::V_3_1_0) {
+    connect_options.MQTTVersion = MQTTVERSION_3_1;
+  } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) {
+    connect_options.MQTTVersion = MQTTVERSION_3_1_1;
+  }
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& 
connect_options, MQTTProperties& connect_properties, MQTTProperties& 
will_properties) const {
+  connect_options = MQTTAsync_connectOptions_initializer5;
+  connect_options.onSuccess5 = connectionSuccess5;
+  connect_options.onFailure5 = connectionFailure5;
+  connect_options.connectProperties = &connect_properties;
+
+  connect_options.cleanstart = getCleanStart();
+
+  {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+    property.value.integer4 = 
gsl::narrow<int>(getSessionExpiryInterval().count());
+    MQTTProperties_add(&connect_properties, &property);
+  }
+
+  if (!last_will_content_type_.empty()) {
+    MQTTProperty property;
+    property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
+    property.value.data.len = last_will_content_type_.length();
+    property.value.data.data = 
const_cast<char*>(last_will_content_type_.data());
+    MQTTProperties_add(&will_properties, &property);
+  }
+
+  connect_options.willProperties = &will_properties;
+
+  setProcessorSpecificMqtt5ConnectOptions(connect_properties);
+}
+
+void AbstractMQTTProcessor::onTrigger(const 
std::shared_ptr<core::ProcessContext>& context, const 
std::shared_ptr<core::ProcessSession>& session) {
+  std::shared_lock client_lock{client_mutex_};
+  if (client_ == nullptr) {
+    logger_->log_debug("Null-op in onTrigger, processor is shutting down.");
+    return;
+  }
+
+  reconnect();
+
+  if (!MQTTAsync_isConnected(client_)) {
+    logger_->log_error("Could not work with MQTT broker because disconnected 
to %s", uri_);
+    yield();
+    return;
   }
+
+  onTriggerImpl(context, session);
 }
 
 void AbstractMQTTProcessor::freeResources() {
-  if (client_ && MQTTAsync_isConnected(client_)) {
-    MQTTAsync_disconnectOptions disconnect_options = 
MQTTAsync_disconnectOptions_initializer;
-    disconnect_options.context = this;
-    disconnect_options.onSuccess = disconnectionSuccess;
-    disconnect_options.onFailure = disconnectionFailure;
-    disconnect_options.timeout = 
gsl::narrow<int>(std::chrono::milliseconds{connection_timeout_}.count());
-    MQTTAsync_disconnect(client_, &disconnect_options);
+  // write lock
+  std::lock_guard client_lock{client_mutex_};
+
+  if (!client_) {
+    return;
   }
-  if (client_) {
-    MQTTAsync_destroy(&client_);
+
+  disconnect();
+
+  MQTTAsync_destroy(&client_);
+}
+
+void AbstractMQTTProcessor::disconnect() {
+  if (!MQTTAsync_isConnected(client_)) {
+    return;
+  }
+
+  MQTTAsync_disconnectOptions disconnect_options = 
MQTTAsync_disconnectOptions_initializer;
+  ConnectFinishedTask disconnect_finished_task(
+          [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* 
success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* 
failure_data_5) {
+            onDisconnectFinished(success_data, success_data_5, failure_data, 
failure_data_5);
+          });
+  disconnect_options.context = &disconnect_finished_task;
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+    disconnect_options.onSuccess5 = connectionSuccess5;
+    disconnect_options.onFailure5 = connectionFailure5;
+  } else {
+    disconnect_options.onSuccess = connectionSuccess;
+    disconnect_options.onFailure = connectionFailure;
+  }
+
+  disconnect_options.timeout = 
gsl::narrow<int>(std::chrono::milliseconds{connection_timeout_}.count());
+
+  const int ret = MQTTAsync_disconnect(client_, &disconnect_options);
+  if (ret != MQTTASYNC_SUCCESS) {
+    logger_->log_error("MQTTAsync_disconnect failed to MQTT broker %s with 
error code [%d]", uri_, ret);
+    return;
+  }
+
+  // wait until connection succeeds or fails
+  disconnect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setBrokerLimits(MQTTAsync_successData5* response) {
+  auto readProperty = [response] (MQTTPropertyCodes property_code, auto& 
out_var) {
+    const int value = MQTTProperties_getNumericValue(&response->properties, 
property_code);
+    if (value != PAHO_MQTT_C_FAILURE_CODE) {
+      if constexpr (std::is_same_v<decltype(out_var), 
std::optional<std::chrono::seconds>&>) {
+        out_var = std::chrono::seconds(value);
+      } else {
+        out_var = gsl::narrow<typename 
std::remove_reference_t<decltype(out_var)>::value_type>(value);
+      }
+    } else {
+      out_var.reset();
+    }
+  };
+
+  readProperty(MQTTPROPERTY_CODE_RETAIN_AVAILABLE, retain_available_);
+  readProperty(MQTTPROPERTY_CODE_WILDCARD_SUBSCRIPTION_AVAILABLE, 
wildcard_subscription_available_);
+  readProperty(MQTTPROPERTY_CODE_SHARED_SUBSCRIPTION_AVAILABLE, 
shared_subscription_available_);
+
+  readProperty(MQTTPROPERTY_CODE_TOPIC_ALIAS_MAXIMUM, 
broker_topic_alias_maximum_);
+  readProperty(MQTTPROPERTY_CODE_RECEIVE_MAXIMUM, broker_receive_maximum_);
+  readProperty(MQTTPROPERTY_CODE_MAXIMUM_QOS, maximum_qos_);
+  readProperty(MQTTPROPERTY_CODE_MAXIMUM_PACKET_SIZE, maximum_packet_size_);
+
+  readProperty(MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL, 
maximum_session_expiry_interval_);
+  readProperty(MQTTPROPERTY_CODE_SERVER_KEEP_ALIVE, server_keep_alive_);
+}
+
+void AbstractMQTTProcessor::checkBrokerLimits() {
+  try {
+    if (server_keep_alive_.has_value() && server_keep_alive_ < 
keep_alive_interval_) {
+      std::ostringstream os;
+      os << "Set Keep Alive Interval (" << keep_alive_interval_.count() << " 
s) is longer then maximum supported by broker (" << server_keep_alive_->count() 
<< " s)";

Review Comment:
   ```suggestion
         os << "Set Keep Alive Interval (" << keep_alive_interval_.count() << " 
s) is longer than maximum supported by broker (" << server_keep_alive_->count() 
<< " s)";
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to