[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2023-01-16 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1071202236


##
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##
@@ -137,49 +148,317 @@ void AbstractMQTTProcessor::onSchedule(const 
std::shared_ptrlog_error("MQTT client is not existing while trying to 
reconnect");
+throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT 
client is not existing while trying to reconnect");
+  }
+  if (MQTTAsync_isConnected(client_)) {
+logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
 return;
   }
+
+  MQTTAsync_connectOptions connect_options;
+  MQTTProperties connect_properties = MQTTProperties_initializer;
+  MQTTProperties will_properties = MQTTProperties_initializer;
+
+  ConnectFinishedTask connect_finished_task(
+  [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* 
success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* 
failure_data_5) {
+onConnectFinished(success_data, success_data_5, failure_data, 
failure_data_5);
+  });
+
+  setConnectOptions(connect_options, connect_properties, will_properties, 
connect_finished_task);
+
+  logger_->log_info("Reconnecting to %s", uri_);
   if (MQTTAsync_isConnected(client_)) {
-logger_->log_info("Already connected to %s, no need to reconnect", uri_);
+logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+return;
+  }
+  const int ret = MQTTAsync_connect(client_, _options);
+  MQTTProperties_free(_properties);
+  if (ret != MQTTASYNC_SUCCESS) {
+logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
 return;
   }
-  MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
-  conn_opts.keepAliveInterval = gsl::narrow(keep_alive_interval_.count());
-  conn_opts.cleansession = getCleanSession();
-  conn_opts.context = this;
-  conn_opts.onSuccess = connectionSuccess;
-  conn_opts.onFailure = connectionFailure;
-  conn_opts.connectTimeout = gsl::narrow(connection_timeout_.count());
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setConnectOptions(MQTTAsync_connectOptions& 
connect_options, MQTTProperties& connect_properties,
+  MQTTProperties& will_properties, 
const ConnectFinishedTask& connect_finished_task) const {
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+setMqtt5ConnectOptions(connect_options, connect_properties, 
will_properties);
+  } else {
+setMqtt3ConnectOptions(connect_options);
+  }
+
+  connect_options.context = 
const_cast(_finished_task);
+  connect_options.connectTimeout = 
gsl::narrow(connection_timeout_.count());
+  connect_options.keepAliveInterval = 
gsl::narrow(keep_alive_interval_.count());
   if (!username_.empty()) {
-conn_opts.username = username_.c_str();
-conn_opts.password = password_.c_str();
+connect_options.username = username_.c_str();
+connect_options.password = password_.c_str();
   }
   if (sslOpts_) {
-conn_opts.ssl = &*sslOpts_;
+connect_options.ssl = const_cast(&*sslOpts_);
   }
   if (last_will_) {
-conn_opts.will = &*last_will_;
+connect_options.will = const_cast(&*last_will_);
   }
+}
 
-  logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, _opts);
-  if (ret != MQTTASYNC_SUCCESS) {
-logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, 
ret);
+void AbstractMQTTProcessor::setMqtt3ConnectOptions(MQTTAsync_connectOptions& 
connect_options) const {
+  connect_options = MQTTAsync_connectOptions_initializer;
+  connect_options.onSuccess = connectionSuccess;
+  connect_options.onFailure = connectionFailure;
+  connect_options.cleansession = getCleanSession();
+
+  if (mqtt_version_.value() == MqttVersions::V_3_1_0) {
+connect_options.MQTTVersion = MQTTVERSION_3_1;
+  } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) {
+connect_options.MQTTVersion = MQTTVERSION_3_1_1;
+  }
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& 
connect_options, MQTTProperties& connect_properties, MQTTProperties& 
will_properties) const {
+  connect_options = MQTTAsync_connectOptions_initializer5;
+  connect_options.onSuccess5 = connectionSuccess5;
+  connect_options.onFailure5 = connectionFailure5;
+  connect_options.connectProperties = _properties;
+
+  connect_options.cleanstart = getCleanStart();
+
+  {
+MQTTProperty property;
+property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+property.value.integer4 = 
gsl::narrow(getSessionExpiryInterval().count());

Review Comment:
   That does not, but MQTT 5 specs does: 
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048
   "Followed by the Four Byte Integer representing the Session 

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2023-01-16 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1071202236


##
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##
@@ -137,49 +148,317 @@ void AbstractMQTTProcessor::onSchedule(const 
std::shared_ptrlog_error("MQTT client is not existing while trying to 
reconnect");
+throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT 
client is not existing while trying to reconnect");
+  }
+  if (MQTTAsync_isConnected(client_)) {
+logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
 return;
   }
+
+  MQTTAsync_connectOptions connect_options;
+  MQTTProperties connect_properties = MQTTProperties_initializer;
+  MQTTProperties will_properties = MQTTProperties_initializer;
+
+  ConnectFinishedTask connect_finished_task(
+  [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* 
success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* 
failure_data_5) {
+onConnectFinished(success_data, success_data_5, failure_data, 
failure_data_5);
+  });
+
+  setConnectOptions(connect_options, connect_properties, will_properties, 
connect_finished_task);
+
+  logger_->log_info("Reconnecting to %s", uri_);
   if (MQTTAsync_isConnected(client_)) {
-logger_->log_info("Already connected to %s, no need to reconnect", uri_);
+logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+return;
+  }
+  const int ret = MQTTAsync_connect(client_, _options);
+  MQTTProperties_free(_properties);
+  if (ret != MQTTASYNC_SUCCESS) {
+logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
 return;
   }
-  MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
-  conn_opts.keepAliveInterval = gsl::narrow(keep_alive_interval_.count());
-  conn_opts.cleansession = getCleanSession();
-  conn_opts.context = this;
-  conn_opts.onSuccess = connectionSuccess;
-  conn_opts.onFailure = connectionFailure;
-  conn_opts.connectTimeout = gsl::narrow(connection_timeout_.count());
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setConnectOptions(MQTTAsync_connectOptions& 
connect_options, MQTTProperties& connect_properties,
+  MQTTProperties& will_properties, 
const ConnectFinishedTask& connect_finished_task) const {
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+setMqtt5ConnectOptions(connect_options, connect_properties, 
will_properties);
+  } else {
+setMqtt3ConnectOptions(connect_options);
+  }
+
+  connect_options.context = 
const_cast(_finished_task);
+  connect_options.connectTimeout = 
gsl::narrow(connection_timeout_.count());
+  connect_options.keepAliveInterval = 
gsl::narrow(keep_alive_interval_.count());
   if (!username_.empty()) {
-conn_opts.username = username_.c_str();
-conn_opts.password = password_.c_str();
+connect_options.username = username_.c_str();
+connect_options.password = password_.c_str();
   }
   if (sslOpts_) {
-conn_opts.ssl = &*sslOpts_;
+connect_options.ssl = const_cast(&*sslOpts_);
   }
   if (last_will_) {
-conn_opts.will = &*last_will_;
+connect_options.will = const_cast(&*last_will_);
   }
+}
 
-  logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, _opts);
-  if (ret != MQTTASYNC_SUCCESS) {
-logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, 
ret);
+void AbstractMQTTProcessor::setMqtt3ConnectOptions(MQTTAsync_connectOptions& 
connect_options) const {
+  connect_options = MQTTAsync_connectOptions_initializer;
+  connect_options.onSuccess = connectionSuccess;
+  connect_options.onFailure = connectionFailure;
+  connect_options.cleansession = getCleanSession();
+
+  if (mqtt_version_.value() == MqttVersions::V_3_1_0) {
+connect_options.MQTTVersion = MQTTVERSION_3_1;
+  } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) {
+connect_options.MQTTVersion = MQTTVERSION_3_1_1;
+  }
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& 
connect_options, MQTTProperties& connect_properties, MQTTProperties& 
will_properties) const {
+  connect_options = MQTTAsync_connectOptions_initializer5;
+  connect_options.onSuccess5 = connectionSuccess5;
+  connect_options.onFailure5 = connectionFailure5;
+  connect_options.connectProperties = _properties;
+
+  connect_options.cleanstart = getCleanStart();
+
+  {
+MQTTProperty property;
+property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+property.value.integer4 = 
gsl::narrow(getSessionExpiryInterval().count());

Review Comment:
   That does not, but MQTT 5 specs does: 
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901048
   "Followed by the Four Byte Integer representing the Session 

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-07 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1042215060


##
extensions/mqtt/processors/AbstractMQTTProcessor.h:
##
@@ -46,150 +44,178 @@ class AbstractMQTTProcessor : public core::Processor {
 freeResources();
   }
 
+  SMART_ENUM(MqttVersions,
+(V_3X_AUTO, "3.x AUTO"),
+(V_3_1_0, "3.1.0"),
+(V_3_1_1, "3.1.1"),
+(V_5_0, "5.0"));
+
+  SMART_ENUM(MqttQoS,
+(LEVEL_0, "0"),
+(LEVEL_1, "1"),
+(LEVEL_2, "2"));
+
   EXTENSIONAPI static const core::Property BrokerURI;
   EXTENSIONAPI static const core::Property ClientID;
+  EXTENSIONAPI static const core::Property QoS;
+  EXTENSIONAPI static const core::Property MqttVersion;
+  EXTENSIONAPI static const core::Property ConnectionTimeout;
+  EXTENSIONAPI static const core::Property KeepAliveInterval;
+  EXTENSIONAPI static const core::Property LastWillTopic;
+  EXTENSIONAPI static const core::Property LastWillMessage;
+  EXTENSIONAPI static const core::Property LastWillQoS;
+  EXTENSIONAPI static const core::Property LastWillRetain;
+  EXTENSIONAPI static const core::Property LastWillContentType;
   EXTENSIONAPI static const core::Property Username;
   EXTENSIONAPI static const core::Property Password;
-  EXTENSIONAPI static const core::Property KeepAliveInterval;
-  EXTENSIONAPI static const core::Property MaxFlowSegSize;
-  EXTENSIONAPI static const core::Property ConnectionTimeout;
-  EXTENSIONAPI static const core::Property Topic;
-  EXTENSIONAPI static const core::Property QoS;
   EXTENSIONAPI static const core::Property SecurityProtocol;
   EXTENSIONAPI static const core::Property SecurityCA;
   EXTENSIONAPI static const core::Property SecurityCert;
   EXTENSIONAPI static const core::Property SecurityPrivateKey;
   EXTENSIONAPI static const core::Property SecurityPrivateKeyPassword;
-  EXTENSIONAPI static const core::Property LastWillTopic;
-  EXTENSIONAPI static const core::Property LastWillMessage;
-  EXTENSIONAPI static const core::Property LastWillQoS;
-  EXTENSIONAPI static const core::Property LastWillRetain;
 
-  EXTENSIONAPI static auto properties() {
+
+  static auto basicProperties() {
+return std::array{
+  BrokerURI,
+  ClientID,
+  MqttVersion
+};
+  }
+
+  static auto advancedProperties() {
 return std::array{
-BrokerURI,
-Topic,
-ClientID,
-QoS,
-ConnectionTimeout,
-KeepAliveInterval,
-MaxFlowSegSize,
-LastWillTopic,
-LastWillMessage,
-LastWillQoS,
-LastWillRetain,
-Username,
-Password,
-SecurityProtocol,
-SecurityCA,
-SecurityCert,
-SecurityPrivateKey,
-SecurityPrivateKeyPassword
+  QoS,
+  ConnectionTimeout,
+  KeepAliveInterval,
+  LastWillTopic,
+  LastWillMessage,
+  LastWillQoS,
+  LastWillRetain,
+  LastWillContentType,
+  Username,
+  Password,
+  SecurityProtocol,
+  SecurityCA,
+  SecurityCert,
+  SecurityPrivateKey,
+  SecurityPrivateKeyPassword
 };
   }
 
   void onSchedule(const std::shared_ptr& context, const 
std::shared_ptr& factory) override;
+  void onTrigger(const std::shared_ptr& context, const 
std::shared_ptr& session) override;
 
   void notifyStop() override {
 freeResources();
   }
 
  protected:
+  struct MQTTMessageDeleter {
+void operator()(MQTTAsync_message* message) {
+  MQTTAsync_freeMessage();
+}
+  };
+
+  struct SmartMessage {
+std::unique_ptr contents;
+std::string topic;
+  };
+
+  // defined by Paho MQTT C library
+  static constexpr int PAHO_MQTT_C_FAILURE_CODE = -999;
+  static constexpr int MQTT_MAX_RECEIVE_MAXIMUM = 65535;
+
+  /**
+   * Connect to MQTT broker. Synchronously waits until connection succeeds or 
fails.
+   */
   void reconnect();
 
+  /**
+   * Checks property consistency before connecting to broker
+   */
+  virtual void checkProperties() {
+  }
+
+  /**
+   * Checks broker limits and supported features vs our desired features after 
connecting to broker
+   */
+  void checkBrokerLimits();
+  virtual void checkBrokerLimitsImpl() = 0;
+
+  // variables being used for a synchronous connection and disconnection
+  std::shared_mutex client_mutex_;
+
   MQTTAsync client_ = nullptr;
   std::string uri_;
-  std::string topic_;
   std::chrono::seconds keep_alive_interval_{60};
-  uint64_t max_seg_size_ = std::numeric_limits::max();
-  std::chrono::seconds connection_timeout_{30};
-  uint32_t qos_ = MQTT_QOS_1;
+  std::chrono::seconds connection_timeout_{10};
+  MqttQoS qos_{MqttQoS::LEVEL_0};
   std::string clientID_;
   std::string username_;
   std::string password_;
+  MqttVersions mqtt_version_{MqttVersions::V_3X_AUTO};
 
- private:
-  // MQTT async callback
-  static int msgReceived(void *context, char* topic_name, int topic_len, 

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-07 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1042214797


##
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##
@@ -137,19 +148,45 @@ void AbstractMQTTProcessor::onSchedule(const 
std::shared_ptrlog_error("MQTT client is not existing while trying to 
reconnect");
-return;
+throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT 
client is not existing while trying to reconnect");
   }
   if (MQTTAsync_isConnected(client_)) {
-logger_->log_info("Already connected to %s, no need to reconnect", uri_);
+logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
 return;
   }
-  MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
+
+  MQTTAsync_connectOptions conn_opts;
+  MQTTProperties connect_props = MQTTProperties_initializer;
+  MQTTProperties will_props = MQTTProperties_initializer;
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+conn_opts = MQTTAsync_connectOptions_initializer5;
+conn_opts.onSuccess5 = connectionSuccess5;
+conn_opts.onFailure5 = connectionFailure5;
+conn_opts.connectProperties = _props;
+  } else {
+conn_opts = MQTTAsync_connectOptions_initializer;
+conn_opts.onSuccess = connectionSuccess;
+conn_opts.onFailure = connectionFailure;
+  }
+
+  if (mqtt_version_.value() == MqttVersions::V_3_1_0) {
+conn_opts.MQTTVersion = MQTTVERSION_3_1;
+  } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) {
+conn_opts.MQTTVersion = MQTTVERSION_3_1_1;
+  }
+
   conn_opts.keepAliveInterval = gsl::narrow(keep_alive_interval_.count());
-  conn_opts.cleansession = getCleanSession();
-  conn_opts.context = this;
-  conn_opts.onSuccess = connectionSuccess;
-  conn_opts.onFailure = connectionFailure;
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+setMqtt5ConnectOptions(conn_opts, connect_props, will_props);

Review Comment:
   Done.



##
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, _opts);
+  if (MQTTAsync_isConnected(client_)) {
+logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+return;
+  }
+  const int ret = MQTTAsync_connect(client_, _opts);
+  MQTTProperties_free(_props);
   if (ret != MQTTASYNC_SUCCESS) {
-logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, 
ret);
+logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
+return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& 
conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+MQTTProperty property;
+property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+property.value.integer4 = 
gsl::narrow(getSessionExpiryInterval().count());
+MQTTProperties_add(_props, );
+  }
+
+  if (!last_will_content_type_.empty()) {
+MQTTProperty property;
+property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
+property.value.data.len = last_will_content_type_.length();
+property.value.data.data = 
const_cast(last_will_content_type_.data());
+MQTTProperties_add(_props, );
+  }
+
+  conn_opts.willProperties = _props;
+
+  setMqtt5ConnectOptionsImpl(connect_props);
+}
+
+void AbstractMQTTProcessor::onTrigger(const 
std::shared_ptr& context, const 
std::shared_ptr& session) {
+  // read lock
+  std::shared_lock client_lock{client_mutex_};
+  if (client_ == nullptr) {
+// we are shutting down

Review Comment:
   Done.



##
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, _opts);
+  if (MQTTAsync_isConnected(client_)) {
+logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+return;
+  }
+  const int ret = MQTTAsync_connect(client_, _opts);
+  MQTTProperties_free(_props);
   if (ret != MQTTASYNC_SUCCESS) {
-logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, 
ret);
+logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
+return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& 
conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+MQTTProperty property;
+property.identifier = 

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-07 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1042144555


##
PROCESSORS.md:
##
@@ -337,28 +337,34 @@ This Processor gets the contents of a FlowFile from a 
MQTT broker for a specifie
 
 In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
 
-| Name  | Default Value | Allowable Values | Description   

  |
-|---|---|--|-|
-| **Broker URI**|   |  | The URI to use to 
connect to the MQTT broker  
  |
-| **Topic** |   |  | The topic to 
subscribe to
   |
-| Client ID |   |  | MQTT client ID to 
use 
  |
-| Quality of Service| 0 |  | The Quality of 
Service (QoS) to receive the message with. Accepts three values '0', '1' and 
'2' |
-| Connection Timeout| 30 sec|  | Maximum time 
interval the client will wait for the network connection to the MQTT broker 
   |
-| Keep Alive Interval   | 60 sec|  | Defines the 
maximum time interval between messages being sent to the broker 
|
-| Max Flow Segment Size |   |  | Maximum flow 
content payload segment size for the MQTT record
   |
-| Last Will Topic   |   |  | The topic to send 
the client's Last Will to. If the Last Will topic is not set then a Last Will 
will not be sent|
-| Last Will Message |   |  | The message to 
send as the client's Last Will. If the Last Will Message is empty, Last Will 
will be deleted from the broker |
-| Last Will QoS | 0 |  | The Quality of 
Service (QoS) to send the last will with. Accepts three values '0', '1' and '2' 
 |
-| Last Will Retain  | false |  | Whether to retain 
the client's Last Will  
  |
-| Security Protocol |   |  | Protocol used to 
communicate with brokers
   |
-| Security CA   |   |  | File or directory 
path to CA certificate(s) for verifying the broker's key
  |
-| Security Cert |   |  | Path to client's 
public key (PEM) used for authentication
   |
-| Security Private Key  |   |  | Path to client's 
private key (PEM) used for authentication   
   |
-| Security Pass Phrase  |   |  | Private key 
passphrase  
|
-| Username  |   |  | Username to use 
when connecting to the broker   
|
-| Password  |   |  | Password to use 
when connecting to the broker   
|
-| Clean Session | true  |  | Whether to start 
afresh rather than remembering previous subscriptions   
   |
-| Queue Max Message | 1000  |  | Maximum number of 
messages allowed on the received MQTT queue 
  |
+| Name| Default Value | Allowable Values| 
Description 
|

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-07 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1042119988


##
extensions/mqtt/processors/PublishMQTT.cpp:
##
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr 
, const std::shared_ptr ) {
+void PublishMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (!context->getProperty(Topic).has_value()) {
+logger_->log_error("PublishMQTT: could not get Topic");
+  }

Review Comment:
   Update: if it's empty, even expression language cannot help. I am putting it 
back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041328112


##
extensions/mqtt/processors/PublishMQTT.cpp:
##
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr 
, const std::shared_ptr ) {
+void PublishMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (!context->getProperty(Topic).has_value()) {
+logger_->log_error("PublishMQTT: could not get Topic");
+  }
+
   if (const auto retain_opt = context->getProperty(Retain)) {
 retain_ = *retain_opt;
   }
   logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  AbstractMQTTProcessor::onSchedule(context, factory);
+  if (const auto message_expiry_interval = 
context->getProperty(MessageExpiryInterval)) {
+message_expiry_interval_ = 
std::chrono::duration_cast(message_expiry_interval->getMilliseconds());
+logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", 
int64_t{message_expiry_interval_->count()});
+  }
+
+  in_flight_message_counter_.setMqttVersion(mqtt_version_);
+  in_flight_message_counter_.setQoS(qos_);
 }
 
-void PublishMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr ) {
-  // reconnect if needed
-  reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr& 
context, const std::shared_ptr ) {
+  std::shared_ptr flow_file = session->get();
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not publish to MQTT broker because disconnected 
to %s", uri_);
-yield();
+  if (!flow_file) {
 return;
   }
 
-  std::shared_ptr flowFile = session->get();
+  // broker's Receive Maximum can change after reconnect
+  
in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM));
 
-  if (!flowFile) {
-return;
+  const auto topic = getTopic(context, flow_file);
+  try {
+const auto result = session->readBuffer(flow_file);
+if (result.status < 0 || !sendMessage(result.buffer, topic, 
getContentType(context, flow_file), flow_file)) {
+  logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on 
broker %s", flow_file->getUUIDStr(), topic, uri_);
+  session->transfer(flow_file, Failure);
+  return;
+}
+logger_->log_debug("Sent flow file [%s] with length %" PRId64 " to MQTT 
topic '%s' on broker %s", flow_file->getUUIDStr(), result.status, topic, uri_);
+session->transfer(flow_file, Success);
+  } catch (const Exception& ex) {
+logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on 
broker %s, exception string: '%s'", flow_file->getUUIDStr(), topic, uri_, 
ex.what());
+session->transfer(flow_file, Failure);
+  }
+}
+
+bool PublishMQTT::sendMessage(const std::vector& buffer, const 
std::string& topic, const std::string& content_type, const 
std::shared_ptr& flow_file) {
+  if (buffer.size() > 268'435'455) {
+logger_->log_error("Sending message failed because MQTT limit maximum 
packet size [268'435'455] is exceeded by FlowFile of [%zu]", buffer.size());
   }
 
-  PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_, 
topic_, client_, gsl::narrow(qos_), retain_);
-  session->read(flowFile, std::ref(callback));
-  if (callback.status_ < 0) {
-logger_->log_error("Failed to send flow to MQTT topic %s", topic_);
-session->transfer(flowFile, Failure);
+  if (maximum_packet_size_.has_value() && buffer.size() > 
*(maximum_packet_size_)) {
+logger_->log_error("Sending message failed because broker-requested 
maximum packet size [%" PRIu32 "] is exceeded by FlowFile of [%zu]",
+   *maximum_packet_size_, buffer.size());
+  }
+
+  MQTTAsync_message message_to_publish = MQTTAsync_message_initializer;
+  message_to_publish.payload = const_cast(buffer.data());
+  message_to_publish.payloadlen = buffer.size();
+  message_to_publish.qos = qos_.value();
+  message_to_publish.retained = retain_;
+
+  setMqtt5Properties(message_to_publish, content_type, flow_file);
+
+  MQTTAsync_responseOptions response_options = 
MQTTAsync_responseOptions_initializer;
+  if (mqtt_version_ == MqttVersions::V_5_0) {
+response_options.onSuccess5 = sendSuccess5;
+response_options.onFailure5 = sendFailure5;
   } else {
-logger_->log_debug("Sent flow with length %d to MQTT topic %s", 
callback.read_size_, topic_);
-session->transfer(flowFile, Success);
-  }
-}
-
-int64_t PublishMQTT::ReadCallback::operator()(const 
std::shared_ptr& stream) {
-  if (flow_size_ < max_seg_size_)
-max_seg_size_ = flow_size_;
-  gsl_Expects(max_seg_size_ < 
gsl::narrow(std::numeric_limits::max()));
-  std::vector buffer(max_seg_size_);
-  read_size_ = 0;
-  status_ = 0;
-  while (read_size_ < flow_size_) {
-// MQTTClient_message::payloadlen is int, so we can't handle 2GB+
-const auto readRet = stream->read(buffer);
-

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041315835


##
extensions/mqtt/processors/PublishMQTT.cpp:
##
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr 
, const std::shared_ptr ) {
+void PublishMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (!context->getProperty(Topic).has_value()) {
+logger_->log_error("PublishMQTT: could not get Topic");
+  }
+
   if (const auto retain_opt = context->getProperty(Retain)) {
 retain_ = *retain_opt;
   }
   logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  AbstractMQTTProcessor::onSchedule(context, factory);
+  if (const auto message_expiry_interval = 
context->getProperty(MessageExpiryInterval)) {
+message_expiry_interval_ = 
std::chrono::duration_cast(message_expiry_interval->getMilliseconds());
+logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", 
int64_t{message_expiry_interval_->count()});
+  }
+
+  in_flight_message_counter_.setMqttVersion(mqtt_version_);
+  in_flight_message_counter_.setQoS(qos_);
 }
 
-void PublishMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr ) {
-  // reconnect if needed
-  reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr& 
context, const std::shared_ptr ) {
+  std::shared_ptr flow_file = session->get();
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not publish to MQTT broker because disconnected 
to %s", uri_);
-yield();
+  if (!flow_file) {
 return;
   }
 
-  std::shared_ptr flowFile = session->get();
+  // broker's Receive Maximum can change after reconnect
+  
in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM));
 
-  if (!flowFile) {
-return;
+  const auto topic = getTopic(context, flow_file);
+  try {
+const auto result = session->readBuffer(flow_file);
+if (result.status < 0 || !sendMessage(result.buffer, topic, 
getContentType(context, flow_file), flow_file)) {
+  logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on 
broker %s", flow_file->getUUIDStr(), topic, uri_);
+  session->transfer(flow_file, Failure);
+  return;
+}
+logger_->log_debug("Sent flow file [%s] with length %" PRId64 " to MQTT 
topic '%s' on broker %s", flow_file->getUUIDStr(), result.status, topic, uri_);
+session->transfer(flow_file, Success);
+  } catch (const Exception& ex) {
+logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on 
broker %s, exception string: '%s'", flow_file->getUUIDStr(), topic, uri_, 
ex.what());
+session->transfer(flow_file, Failure);
+  }
+}
+
+bool PublishMQTT::sendMessage(const std::vector& buffer, const 
std::string& topic, const std::string& content_type, const 
std::shared_ptr& flow_file) {
+  if (buffer.size() > 268'435'455) {
+logger_->log_error("Sending message failed because MQTT limit maximum 
packet size [268'435'455] is exceeded by FlowFile of [%zu]", buffer.size());
   }
 
-  PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_, 
topic_, client_, gsl::narrow(qos_), retain_);
-  session->read(flowFile, std::ref(callback));
-  if (callback.status_ < 0) {
-logger_->log_error("Failed to send flow to MQTT topic %s", topic_);
-session->transfer(flowFile, Failure);
+  if (maximum_packet_size_.has_value() && buffer.size() > 
*(maximum_packet_size_)) {
+logger_->log_error("Sending message failed because broker-requested 
maximum packet size [%" PRIu32 "] is exceeded by FlowFile of [%zu]",
+   *maximum_packet_size_, buffer.size());
+  }
+
+  MQTTAsync_message message_to_publish = MQTTAsync_message_initializer;
+  message_to_publish.payload = const_cast(buffer.data());
+  message_to_publish.payloadlen = buffer.size();
+  message_to_publish.qos = qos_.value();
+  message_to_publish.retained = retain_;
+
+  setMqtt5Properties(message_to_publish, content_type, flow_file);
+
+  MQTTAsync_responseOptions response_options = 
MQTTAsync_responseOptions_initializer;
+  if (mqtt_version_ == MqttVersions::V_5_0) {
+response_options.onSuccess5 = sendSuccess5;
+response_options.onFailure5 = sendFailure5;
   } else {
-logger_->log_debug("Sent flow with length %d to MQTT topic %s", 
callback.read_size_, topic_);
-session->transfer(flowFile, Success);
-  }
-}
-
-int64_t PublishMQTT::ReadCallback::operator()(const 
std::shared_ptr& stream) {
-  if (flow_size_ < max_seg_size_)
-max_seg_size_ = flow_size_;
-  gsl_Expects(max_seg_size_ < 
gsl::narrow(std::numeric_limits::max()));
-  std::vector buffer(max_seg_size_);
-  read_size_ = 0;
-  status_ = 0;
-  while (read_size_ < flow_size_) {
-// MQTTClient_message::payloadlen is int, so we can't handle 2GB+
-const auto readRet = stream->read(buffer);
-

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041313305


##
extensions/mqtt/processors/PublishMQTT.cpp:
##
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr 
, const std::shared_ptr ) {
+void PublishMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (!context->getProperty(Topic).has_value()) {
+logger_->log_error("PublishMQTT: could not get Topic");
+  }
+
   if (const auto retain_opt = context->getProperty(Retain)) {
 retain_ = *retain_opt;
   }
   logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  AbstractMQTTProcessor::onSchedule(context, factory);
+  if (const auto message_expiry_interval = 
context->getProperty(MessageExpiryInterval)) {
+message_expiry_interval_ = 
std::chrono::duration_cast(message_expiry_interval->getMilliseconds());
+logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", 
int64_t{message_expiry_interval_->count()});
+  }
+
+  in_flight_message_counter_.setMqttVersion(mqtt_version_);
+  in_flight_message_counter_.setQoS(qos_);
 }
 
-void PublishMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr ) {
-  // reconnect if needed
-  reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr& 
context, const std::shared_ptr ) {
+  std::shared_ptr flow_file = session->get();
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not publish to MQTT broker because disconnected 
to %s", uri_);
-yield();
+  if (!flow_file) {
 return;
   }
 
-  std::shared_ptr flowFile = session->get();
+  // broker's Receive Maximum can change after reconnect
+  
in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM));
 
-  if (!flowFile) {
-return;
+  const auto topic = getTopic(context, flow_file);
+  try {
+const auto result = session->readBuffer(flow_file);
+if (result.status < 0 || !sendMessage(result.buffer, topic, 
getContentType(context, flow_file), flow_file)) {
+  logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on 
broker %s", flow_file->getUUIDStr(), topic, uri_);
+  session->transfer(flow_file, Failure);
+  return;
+}
+logger_->log_debug("Sent flow file [%s] with length %" PRId64 " to MQTT 
topic '%s' on broker %s", flow_file->getUUIDStr(), result.status, topic, uri_);
+session->transfer(flow_file, Success);
+  } catch (const Exception& ex) {
+logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on 
broker %s, exception string: '%s'", flow_file->getUUIDStr(), topic, uri_, 
ex.what());
+session->transfer(flow_file, Failure);
+  }
+}
+
+bool PublishMQTT::sendMessage(const std::vector& buffer, const 
std::string& topic, const std::string& content_type, const 
std::shared_ptr& flow_file) {
+  if (buffer.size() > 268'435'455) {

Review Comment:
   It is coming from the MQTT specification. Remaining Length field is a 
variable byte integer:
   https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901024
   The maximum value of a Variable Byte Integer is 268,435,455. I found the 
decimal representation all over the internet, so I think we should stay with 
the one more commonly used.
   https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901011
   But I'm extracting it to a constant as requested, I don't like magic numbers 
either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041290532


##
extensions/mqtt/processors/PublishMQTT.cpp:
##
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr 
, const std::shared_ptr ) {
+void PublishMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (!context->getProperty(Topic).has_value()) {
+logger_->log_error("PublishMQTT: could not get Topic");
+  }
+
   if (const auto retain_opt = context->getProperty(Retain)) {
 retain_ = *retain_opt;
   }
   logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  AbstractMQTTProcessor::onSchedule(context, factory);
+  if (const auto message_expiry_interval = 
context->getProperty(MessageExpiryInterval)) {
+message_expiry_interval_ = 
std::chrono::duration_cast(message_expiry_interval->getMilliseconds());
+logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", 
int64_t{message_expiry_interval_->count()});
+  }
+
+  in_flight_message_counter_.setMqttVersion(mqtt_version_);
+  in_flight_message_counter_.setQoS(qos_);
 }
 
-void PublishMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr ) {
-  // reconnect if needed
-  reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr& 
context, const std::shared_ptr ) {
+  std::shared_ptr flow_file = session->get();
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not publish to MQTT broker because disconnected 
to %s", uri_);
-yield();
+  if (!flow_file) {
 return;

Review Comment:
   Good point! Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041288113


##
extensions/mqtt/processors/PublishMQTT.cpp:
##
@@ -34,76 +34,229 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr 
, const std::shared_ptr ) {
+void PublishMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (!context->getProperty(Topic).has_value()) {
+logger_->log_error("PublishMQTT: could not get Topic");
+  }

Review Comment:
   `Topic` is an expression language property. So you are right, it can become 
valid later. I am removing this error message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041281537


##
extensions/mqtt/processors/ConsumeMQTT.cpp:
##
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+logger_->log_error("MQTT queue full");
 return;
   }
 
-  if (gsl::narrow(message->payloadlen) > max_seg_size_) {
-logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
-message->payloadlen = gsl::narrow(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr 
, const std::shared_ptr ) {
+void ConsumeMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (auto value = context->getProperty(Topic)) {
+topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty(CleanSession)) {
-cleanSession_ = *value;
-logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty(CleanStart)) {
+clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = 
context->getProperty(SessionExpiryInterval)) {
+session_expiry_interval_ = 
std::chrono::duration_cast(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", 
int64_t{session_expiry_interval_.count()});
 
   if (const auto value = 
context->getProperty(QueueBufferMaxMessage)) {
-maxQueueSize_ = *value;
-logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
+max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read 
before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", 
attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr ) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = 
context->getProperty(TopicAliasMaximum)) {
+topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", 
topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not consume from MQTT broker because 
disconnected to %s", uri_);
-yield();
-return;
+  if (const auto receive_maximum = 
context->getProperty(ReceiveMaximum)) {
+receive_maximum_ = gsl::narrow(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", 
receive_maximum_);
+}
 
-  std::deque> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& 
/*context*/, const std::shared_ptr& session) {
+  std::queue msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
 const auto& message = msg_queue.front();
-std::shared_ptr processFlowFile = session->create();
-int write_status{};
-session->write(processFlowFile, [, _status](const 
std::shared_ptr& stream) -> int64_t {
-  if (message->payloadlen < 0) {
-write_status = -1;
-return -1;
-  }
-  const auto len = 
stream->write(reinterpret_cast(message->payload), 
gsl::narrow(message->payloadlen));
-  if (io::isError(len)) {
-write_status = -1;
-return -1;
-  }
-  return gsl::narrow(len);
-});
-if (write_status < 0) {
-  logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
-  session->remove(processFlowFile);
+std::shared_ptr flow_file = session->create();
+WriteCallback write_callback(message, logger_);
+try {
+  session->write(flow_file, write_callback);
+} catch (const Exception& ex) {
+  logger_->log_error("Error when processing message queue: %s", ex.what());
+}
+if (!write_callback.getSuccessStatus()) {
+  

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041281303


##
extensions/mqtt/processors/ConsumeMQTT.cpp:
##
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+logger_->log_error("MQTT queue full");
 return;
   }
 
-  if (gsl::narrow(message->payloadlen) > max_seg_size_) {
-logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
-message->payloadlen = gsl::narrow(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr 
, const std::shared_ptr ) {
+void ConsumeMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (auto value = context->getProperty(Topic)) {
+topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty(CleanSession)) {
-cleanSession_ = *value;
-logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty(CleanStart)) {
+clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = 
context->getProperty(SessionExpiryInterval)) {
+session_expiry_interval_ = 
std::chrono::duration_cast(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", 
int64_t{session_expiry_interval_.count()});
 
   if (const auto value = 
context->getProperty(QueueBufferMaxMessage)) {
-maxQueueSize_ = *value;
-logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
+max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read 
before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", 
attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr ) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = 
context->getProperty(TopicAliasMaximum)) {
+topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", 
topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not consume from MQTT broker because 
disconnected to %s", uri_);
-yield();
-return;
+  if (const auto receive_maximum = 
context->getProperty(ReceiveMaximum)) {
+receive_maximum_ = gsl::narrow(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", 
receive_maximum_);
+}
 
-  std::deque> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& 
/*context*/, const std::shared_ptr& session) {
+  std::queue msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
 const auto& message = msg_queue.front();
-std::shared_ptr processFlowFile = session->create();
-int write_status{};
-session->write(processFlowFile, [, _status](const 
std::shared_ptr& stream) -> int64_t {
-  if (message->payloadlen < 0) {
-write_status = -1;
-return -1;
-  }
-  const auto len = 
stream->write(reinterpret_cast(message->payload), 
gsl::narrow(message->payloadlen));
-  if (io::isError(len)) {
-write_status = -1;
-return -1;
-  }
-  return gsl::narrow(len);
-});
-if (write_status < 0) {
-  logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
-  session->remove(processFlowFile);
+std::shared_ptr flow_file = session->create();
+WriteCallback write_callback(message, logger_);
+try {
+  session->write(flow_file, write_callback);
+} catch (const Exception& ex) {
+  logger_->log_error("Error when processing message queue: %s", ex.what());
+}
+if (!write_callback.getSuccessStatus()) {
+  

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041280626


##
extensions/mqtt/processors/ConsumeMQTT.cpp:
##
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+logger_->log_error("MQTT queue full");
 return;
   }
 
-  if (gsl::narrow(message->payloadlen) > max_seg_size_) {
-logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
-message->payloadlen = gsl::narrow(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr 
, const std::shared_ptr ) {
+void ConsumeMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (auto value = context->getProperty(Topic)) {
+topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty(CleanSession)) {
-cleanSession_ = *value;
-logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty(CleanStart)) {
+clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = 
context->getProperty(SessionExpiryInterval)) {
+session_expiry_interval_ = 
std::chrono::duration_cast(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", 
int64_t{session_expiry_interval_.count()});
 
   if (const auto value = 
context->getProperty(QueueBufferMaxMessage)) {
-maxQueueSize_ = *value;
-logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
+max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read 
before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", 
attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr ) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = 
context->getProperty(TopicAliasMaximum)) {
+topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", 
topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not consume from MQTT broker because 
disconnected to %s", uri_);
-yield();
-return;
+  if (const auto receive_maximum = 
context->getProperty(ReceiveMaximum)) {
+receive_maximum_ = gsl::narrow(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", 
receive_maximum_);
+}
 
-  std::deque> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& 
/*context*/, const std::shared_ptr& session) {
+  std::queue msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
 const auto& message = msg_queue.front();
-std::shared_ptr processFlowFile = session->create();
-int write_status{};
-session->write(processFlowFile, [, _status](const 
std::shared_ptr& stream) -> int64_t {
-  if (message->payloadlen < 0) {
-write_status = -1;
-return -1;
-  }
-  const auto len = 
stream->write(reinterpret_cast(message->payload), 
gsl::narrow(message->payloadlen));
-  if (io::isError(len)) {
-write_status = -1;
-return -1;
-  }
-  return gsl::narrow(len);
-});
-if (write_status < 0) {
-  logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
-  session->remove(processFlowFile);
+std::shared_ptr flow_file = session->create();
+WriteCallback write_callback(message, logger_);
+try {
+  session->write(flow_file, write_callback);
+} catch (const Exception& ex) {
+  logger_->log_error("Error when processing message queue: %s", ex.what());
+}
+if (!write_callback.getSuccessStatus()) {
+  

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041268666


##
extensions/mqtt/processors/ConsumeMQTT.cpp:
##
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+logger_->log_error("MQTT queue full");
 return;
   }
 
-  if (gsl::narrow(message->payloadlen) > max_seg_size_) {
-logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
-message->payloadlen = gsl::narrow(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr 
, const std::shared_ptr ) {
+void ConsumeMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (auto value = context->getProperty(Topic)) {
+topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty(CleanSession)) {
-cleanSession_ = *value;
-logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty(CleanStart)) {
+clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = 
context->getProperty(SessionExpiryInterval)) {
+session_expiry_interval_ = 
std::chrono::duration_cast(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", 
int64_t{session_expiry_interval_.count()});
 
   if (const auto value = 
context->getProperty(QueueBufferMaxMessage)) {
-maxQueueSize_ = *value;
-logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
+max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read 
before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", 
attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr ) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = 
context->getProperty(TopicAliasMaximum)) {
+topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", 
topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not consume from MQTT broker because 
disconnected to %s", uri_);
-yield();
-return;
+  if (const auto receive_maximum = 
context->getProperty(ReceiveMaximum)) {
+receive_maximum_ = gsl::narrow(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", 
receive_maximum_);
+}
 
-  std::deque> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& 
/*context*/, const std::shared_ptr& session) {
+  std::queue msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
 const auto& message = msg_queue.front();
-std::shared_ptr processFlowFile = session->create();
-int write_status{};
-session->write(processFlowFile, [, _status](const 
std::shared_ptr& stream) -> int64_t {
-  if (message->payloadlen < 0) {
-write_status = -1;
-return -1;
-  }
-  const auto len = 
stream->write(reinterpret_cast(message->payload), 
gsl::narrow(message->payloadlen));
-  if (io::isError(len)) {
-write_status = -1;
-return -1;
-  }
-  return gsl::narrow(len);
-});
-if (write_status < 0) {
-  logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
-  session->remove(processFlowFile);
+std::shared_ptr flow_file = session->create();
+WriteCallback write_callback(message, logger_);
+try {
+  session->write(flow_file, write_callback);
+} catch (const Exception& ex) {
+  logger_->log_error("Error when processing message queue: %s", ex.what());
+}
+if (!write_callback.getSuccessStatus()) {
+  

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041258769


##
extensions/mqtt/processors/ConsumeMQTT.cpp:
##
@@ -34,111 +34,333 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+logger_->log_error("MQTT queue full");
 return;
   }
 
-  if (gsl::narrow(message->payloadlen) > max_seg_size_) {
-logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
-message->payloadlen = gsl::narrow(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr 
, const std::shared_ptr ) {
+void ConsumeMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (auto value = context->getProperty(Topic)) {
+topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty(CleanSession)) {
-cleanSession_ = *value;
-logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty(CleanStart)) {
+clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = 
context->getProperty(SessionExpiryInterval)) {
+session_expiry_interval_ = 
std::chrono::duration_cast(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", 
int64_t{session_expiry_interval_.count()});
 
   if (const auto value = 
context->getProperty(QueueBufferMaxMessage)) {
-maxQueueSize_ = *value;
-logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
+max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read 
before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", 
attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr ) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = 
context->getProperty(TopicAliasMaximum)) {
+topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", 
topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not consume from MQTT broker because 
disconnected to %s", uri_);
-yield();
-return;
+  if (const auto receive_maximum = 
context->getProperty(ReceiveMaximum)) {
+receive_maximum_ = gsl::narrow(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", 
receive_maximum_);
+}
 
-  std::deque> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& 
/*context*/, const std::shared_ptr& session) {
+  std::queue msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
 const auto& message = msg_queue.front();
-std::shared_ptr processFlowFile = session->create();
-int write_status{};
-session->write(processFlowFile, [, _status](const 
std::shared_ptr& stream) -> int64_t {
-  if (message->payloadlen < 0) {
-write_status = -1;
-return -1;
-  }
-  const auto len = 
stream->write(reinterpret_cast(message->payload), 
gsl::narrow(message->payloadlen));
-  if (io::isError(len)) {
-write_status = -1;
-return -1;
-  }
-  return gsl::narrow(len);
-});
-if (write_status < 0) {
-  logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
-  session->remove(processFlowFile);
+std::shared_ptr flow_file = session->create();
+WriteCallback write_callback(message, logger_);
+try {
+  session->write(flow_file, write_callback);

Review Comment:
   Good catch! I tested it on dummy code, `std::ref` will really be necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on 

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041159887


##
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, _opts);
+  if (MQTTAsync_isConnected(client_)) {
+logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+return;
+  }
+  const int ret = MQTTAsync_connect(client_, _opts);
+  MQTTProperties_free(_props);
   if (ret != MQTTASYNC_SUCCESS) {
-logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, 
ret);
+logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
+return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& 
conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+MQTTProperty property;
+property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+property.value.integer4 = 
gsl::narrow(getSessionExpiryInterval().count());
+MQTTProperties_add(_props, );
+  }
+
+  if (!last_will_content_type_.empty()) {
+MQTTProperty property;
+property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
+property.value.data.len = last_will_content_type_.length();
+property.value.data.data = 
const_cast(last_will_content_type_.data());
+MQTTProperties_add(_props, );
+  }
+
+  conn_opts.willProperties = _props;
+
+  setMqtt5ConnectOptionsImpl(connect_props);
+}
+
+void AbstractMQTTProcessor::onTrigger(const 
std::shared_ptr& context, const 
std::shared_ptr& session) {
+  // read lock
+  std::shared_lock client_lock{client_mutex_};
+  if (client_ == nullptr) {
+// we are shutting down
+return;
+  }
+
+  // reconnect if needed
+  reconnect();
+
+  if (!MQTTAsync_isConnected(client_)) {
+logger_->log_error("Could not work with MQTT broker because disconnected 
to %s", uri_);
+yield();
+return;
+  }
+
+  onTriggerImpl(context, session);
 }
 
 void AbstractMQTTProcessor::freeResources() {
-  if (client_ && MQTTAsync_isConnected(client_)) {
-MQTTAsync_disconnectOptions disconnect_options = 
MQTTAsync_disconnectOptions_initializer;
-disconnect_options.context = this;
-disconnect_options.onSuccess = disconnectionSuccess;
-disconnect_options.onFailure = disconnectionFailure;
-disconnect_options.timeout = 
gsl::narrow(std::chrono::milliseconds{connection_timeout_}.count());
-MQTTAsync_disconnect(client_, _options);
+  // write lock
+  std::lock_guard client_lock{client_mutex_};
+
+  if (!client_) {
+return;
   }
-  if (client_) {
-MQTTAsync_destroy(_);
+
+  disconnect();
+
+  MQTTAsync_destroy(_);
+}
+
+void AbstractMQTTProcessor::disconnect() {
+  if (!MQTTAsync_isConnected(client_)) {
+return;
+  }
+
+  MQTTAsync_disconnectOptions disconnect_options = 
MQTTAsync_disconnectOptions_initializer;
+  std::packaged_task disconnect_finished_task(
+  [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* 
success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* 
failure_data_5) {
+onDisconnectFinished(success_data, success_data_5, failure_data, 
failure_data_5);
+  });
+  disconnect_options.context = _finished_task;
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+disconnect_options.onSuccess5 = connectionSuccess5;
+disconnect_options.onFailure5 = connectionFailure5;
+  } else {
+disconnect_options.onSuccess = connectionSuccess;
+disconnect_options.onFailure = connectionFailure;
+  }
+
+  disconnect_options.timeout = 
gsl::narrow(std::chrono::milliseconds{connection_timeout_}.count());
+
+  const int ret = MQTTAsync_disconnect(client_, _options);
+  if (ret != MQTTASYNC_SUCCESS) {
+logger_->log_error("MQTTAsync_disconnect failed to MQTT broker %s with 
error code [%d]", uri_, ret);
+return;
+  }
+
+  // wait until connection succeeds or fails
+  disconnect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setBrokerLimits(MQTTAsync_successData5* response) {
+  auto readProperty = [response] (MQTTPropertyCodes property_code, auto& 
out_var) {
+const int value = MQTTProperties_getNumericValue(>properties, 
property_code);
+if (value != PAHO_MQTT_C_FAILURE_CODE) {
+  if constexpr (std::is_same_v&>) {
+out_var = std::chrono::seconds(value);
+  } else {
+out_var = gsl::narrow::value_type>(value);
+  }
+} else {
+  out_var.reset();
+}
+  };
+
+  readProperty(MQTTPROPERTY_CODE_RETAIN_AVAILABLE, retain_available_);
+  readProperty(MQTTPROPERTY_CODE_WILDCARD_SUBSCRIPTION_AVAILABLE, 
wildcard_subscription_available_);
+  

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1041148586


##
libminifi/include/utils/Enum.h:
##
@@ -127,7 +128,7 @@ namespace utils {
 #define SMART_ENUM(Clazz, ...) \
   struct Clazz { \
 using Base = ::org::apache::nifi::minifi::utils::EnumBase; \
-enum Type { \
+enum Type : int { \

Review Comment:
   It would still not enable `SMART_ENUM` initialization from integer without a 
cast. Only it's `Type` structure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1040850939


##
extensions/mqtt/processors/AbstractMQTTProcessor.h:
##
@@ -46,150 +44,178 @@ class AbstractMQTTProcessor : public core::Processor {
 freeResources();
   }
 
+  SMART_ENUM(MqttVersions,
+(V_3X_AUTO, "3.x AUTO"),
+(V_3_1_0, "3.1.0"),
+(V_3_1_1, "3.1.1"),
+(V_5_0, "5.0"));
+
+  SMART_ENUM(MqttQoS,
+(LEVEL_0, "0"),
+(LEVEL_1, "1"),
+(LEVEL_2, "2"));
+
   EXTENSIONAPI static const core::Property BrokerURI;
   EXTENSIONAPI static const core::Property ClientID;
+  EXTENSIONAPI static const core::Property QoS;
+  EXTENSIONAPI static const core::Property MqttVersion;
+  EXTENSIONAPI static const core::Property ConnectionTimeout;
+  EXTENSIONAPI static const core::Property KeepAliveInterval;
+  EXTENSIONAPI static const core::Property LastWillTopic;
+  EXTENSIONAPI static const core::Property LastWillMessage;
+  EXTENSIONAPI static const core::Property LastWillQoS;
+  EXTENSIONAPI static const core::Property LastWillRetain;
+  EXTENSIONAPI static const core::Property LastWillContentType;
   EXTENSIONAPI static const core::Property Username;
   EXTENSIONAPI static const core::Property Password;
-  EXTENSIONAPI static const core::Property KeepAliveInterval;
-  EXTENSIONAPI static const core::Property MaxFlowSegSize;
-  EXTENSIONAPI static const core::Property ConnectionTimeout;
-  EXTENSIONAPI static const core::Property Topic;
-  EXTENSIONAPI static const core::Property QoS;
   EXTENSIONAPI static const core::Property SecurityProtocol;
   EXTENSIONAPI static const core::Property SecurityCA;
   EXTENSIONAPI static const core::Property SecurityCert;
   EXTENSIONAPI static const core::Property SecurityPrivateKey;
   EXTENSIONAPI static const core::Property SecurityPrivateKeyPassword;
-  EXTENSIONAPI static const core::Property LastWillTopic;
-  EXTENSIONAPI static const core::Property LastWillMessage;
-  EXTENSIONAPI static const core::Property LastWillQoS;
-  EXTENSIONAPI static const core::Property LastWillRetain;
 
-  EXTENSIONAPI static auto properties() {
+
+  static auto basicProperties() {
+return std::array{
+  BrokerURI,
+  ClientID,
+  MqttVersion
+};
+  }
+
+  static auto advancedProperties() {
 return std::array{
-BrokerURI,
-Topic,
-ClientID,
-QoS,
-ConnectionTimeout,
-KeepAliveInterval,
-MaxFlowSegSize,
-LastWillTopic,
-LastWillMessage,
-LastWillQoS,
-LastWillRetain,
-Username,
-Password,
-SecurityProtocol,
-SecurityCA,
-SecurityCert,
-SecurityPrivateKey,
-SecurityPrivateKeyPassword
+  QoS,
+  ConnectionTimeout,
+  KeepAliveInterval,
+  LastWillTopic,
+  LastWillMessage,
+  LastWillQoS,
+  LastWillRetain,
+  LastWillContentType,
+  Username,
+  Password,
+  SecurityProtocol,
+  SecurityCA,
+  SecurityCert,
+  SecurityPrivateKey,
+  SecurityPrivateKeyPassword
 };
   }
 
   void onSchedule(const std::shared_ptr& context, const 
std::shared_ptr& factory) override;
+  void onTrigger(const std::shared_ptr& context, const 
std::shared_ptr& session) override;
 
   void notifyStop() override {
 freeResources();
   }
 
  protected:
+  struct MQTTMessageDeleter {
+void operator()(MQTTAsync_message* message) {
+  MQTTAsync_freeMessage();

Review Comment:
   It is required, the signature is `void 
MQTTAsync_freeMessage(MQTTAsync_message** msg)`:
   
https://www.eclipse.org/paho/files/mqttdoc/MQTTAsync/html/_m_q_t_t_async_8h.html#a9b45db63052fe29ab1fad22d2a00c91c



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1040845912


##
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##
@@ -163,23 +200,259 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, _opts);
+  if (MQTTAsync_isConnected(client_)) {
+logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+return;
+  }
+  const int ret = MQTTAsync_connect(client_, _opts);
+  MQTTProperties_free(_props);
   if (ret != MQTTASYNC_SUCCESS) {
-logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, 
ret);
+logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
+return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& 
conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+MQTTProperty property;
+property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+property.value.integer4 = 
gsl::narrow(getSessionExpiryInterval().count());
+MQTTProperties_add(_props, );
+  }

Review Comment:
   To me it looks like `property` is copied by value by `MQTTProperties_add()`: 
https://github.com/eclipse/paho.mqtt.c/blob/master/src/MQTTProperties.c#L127



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-06 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1039819121


##
libminifi/include/utils/Enum.h:
##
@@ -39,6 +39,7 @@ namespace utils {
 constexpr Clazz(Type value = static_cast(-1)) : value_{value} {} \
 explicit Clazz(const std::string& str) : value_{parse(str.c_str()).value_} 
{} \
 explicit Clazz(const char* str) : value_{parse(str).value_} {} \
+explicit Clazz(std::nullptr_t) = delete; \

Review Comment:
   Previously initializing with `0` was allowed, with the `const char*` 
constructor. This could be misleading, now it's not allowed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-05 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1039826380


##
PROCESSORS.md:
##
@@ -2364,6 +2380,8 @@ In the list below, the names of required properties 
appear in bold. Any other pr
 ### Description
 
 Routes FlowFiles based on their Attributes using the Attribute Expression 
Language.
+Any number of user-defined dynamic properties can be added, which all support 
the Attribute Expression Language. Relationships matching the name of the 
properties will be added.
+FlowFiles will be routed to all the relationships whose matching property 
evaluates to "true". Unmatched FlowFiles will be routed for "unmatched" 
relationship, while failed ones to "failure".

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-05 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1039825741


##
PROCESSORS.md:
##
@@ -337,28 +337,34 @@ This Processor gets the contents of a FlowFile from a 
MQTT broker for a specifie
 
 In the list below, the names of required properties appear in bold. Any other 
properties (not in bold) are considered optional. The table also indicates any 
default values, and whether a property supports the NiFi Expression Language.
 
-| Name  | Default Value | Allowable Values | Description   

  |
-|---|---|--|-|
-| **Broker URI**|   |  | The URI to use to 
connect to the MQTT broker  
  |
-| **Topic** |   |  | The topic to 
subscribe to
   |
-| Client ID |   |  | MQTT client ID to 
use 
  |
-| Quality of Service| 0 |  | The Quality of 
Service (QoS) to receive the message with. Accepts three values '0', '1' and 
'2' |
-| Connection Timeout| 30 sec|  | Maximum time 
interval the client will wait for the network connection to the MQTT broker 
   |
-| Keep Alive Interval   | 60 sec|  | Defines the 
maximum time interval between messages being sent to the broker 
|
-| Max Flow Segment Size |   |  | Maximum flow 
content payload segment size for the MQTT record
   |
-| Last Will Topic   |   |  | The topic to send 
the client's Last Will to. If the Last Will topic is not set then a Last Will 
will not be sent|
-| Last Will Message |   |  | The message to 
send as the client's Last Will. If the Last Will Message is empty, Last Will 
will be deleted from the broker |
-| Last Will QoS | 0 |  | The Quality of 
Service (QoS) to send the last will with. Accepts three values '0', '1' and '2' 
 |
-| Last Will Retain  | false |  | Whether to retain 
the client's Last Will  
  |
-| Security Protocol |   |  | Protocol used to 
communicate with brokers
   |
-| Security CA   |   |  | File or directory 
path to CA certificate(s) for verifying the broker's key
  |
-| Security Cert |   |  | Path to client's 
public key (PEM) used for authentication
   |
-| Security Private Key  |   |  | Path to client's 
private key (PEM) used for authentication   
   |
-| Security Pass Phrase  |   |  | Private key 
passphrase  
|
-| Username  |   |  | Username to use 
when connecting to the broker   
|
-| Password  |   |  | Password to use 
when connecting to the broker   
|
-| Clean Session | true  |  | Whether to start 
afresh rather than remembering previous subscriptions   
   |
-| Queue Max Message | 1000  |  | Maximum number of 
messages allowed on the received MQTT queue 
  |
+| Name| Default Value | Allowable Values| 
Description 
|

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-05 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1039823934


##
libminifi/include/utils/Enum.h:
##
@@ -127,7 +128,7 @@ namespace utils {
 #define SMART_ENUM(Clazz, ...) \
   struct Clazz { \
 using Base = ::org::apache::nifi::minifi::utils::EnumBase; \
-enum Type { \
+enum Type : int { \

Review Comment:
   Since C++17, an enumeration can be initialized from an integer without a 
cast. For unscoped enums, only if underlying type  is fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-12-05 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1039819121


##
libminifi/include/utils/Enum.h:
##
@@ -39,6 +39,7 @@ namespace utils {
 constexpr Clazz(Type value = static_cast(-1)) : value_{value} {} \
 explicit Clazz(const std::string& str) : value_{parse(str.c_str()).value_} 
{} \
 explicit Clazz(const char* str) : value_{parse(str).value_} {} \
+explicit Clazz(std::nullptr_t) = delete; \

Review Comment:
   Previously initializing with `0` was allowed, with the `const char*` 
constructor. This could be misleading, not it's not allowed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-11-14 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1021602103


##
extensions/mqtt/processors/PublishMQTT.cpp:
##
@@ -34,76 +34,267 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr 
, const std::shared_ptr ) {
+void PublishMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (!context->getProperty(Topic).has_value()) {
+logger_->log_error("PublishMQTT: could not get Topic");
+  }
+
   if (const auto retain_opt = context->getProperty(Retain)) {
 retain_ = *retain_opt;
   }
   logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  AbstractMQTTProcessor::onSchedule(context, factory);
+  if (const auto message_expiry_interval = 
context->getProperty(MessageExpiryInterval)) {
+message_expiry_interval_ = 
std::chrono::duration_cast(message_expiry_interval->getMilliseconds());
+logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", 
int64_t{message_expiry_interval_->count()});
+  }
+
+  in_flight_message_counter_.setMqttVersion(mqtt_version_);
+  in_flight_message_counter_.setQoS(qos_);
 }
 
-void PublishMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr ) {
-  // reconnect if needed
-  reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr& 
context, const std::shared_ptr ) {
+  std::shared_ptr flow_file = session->get();
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not publish to MQTT broker because disconnected 
to %s", uri_);
-yield();
+  if (!flow_file) {
 return;
   }
 
-  std::shared_ptr flowFile = session->get();
+  // broker's Receive Maximum can change after reconnect
+  in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(65535));
 
-  if (!flowFile) {
-return;
-  }
-
-  PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_, 
topic_, client_, gsl::narrow(qos_), retain_);
-  session->read(flowFile, std::ref(callback));
-  if (callback.status_ < 0) {
-logger_->log_error("Failed to send flow to MQTT topic %s", topic_);
-session->transfer(flowFile, Failure);
+  const auto topic = getTopic(context, flow_file);
+  PublishMQTT::ReadCallback callback(this, flow_file, topic, 
getContentType(context, flow_file));
+  session->read(flow_file, std::ref(callback));
+  if (!callback.getSuccessStatus()) {
+logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on 
broker %s", flow_file->getUUIDStr(), topic, uri_);
+session->transfer(flow_file, Failure);
   } else {
-logger_->log_debug("Sent flow with length %d to MQTT topic %s", 
callback.read_size_, topic_);
-session->transfer(flowFile, Success);
+logger_->log_debug("Sent flow file [%s] with length %d to MQTT topic '%s' 
on broker %s", flow_file->getUUIDStr(), callback.getReadSize(), topic, uri_);
+session->transfer(flow_file, Success);
   }
 }
 
 int64_t PublishMQTT::ReadCallback::operator()(const 
std::shared_ptr& stream) {
-  if (flow_size_ < max_seg_size_)
-max_seg_size_ = flow_size_;
-  gsl_Expects(max_seg_size_ < 
gsl::narrow(std::numeric_limits::max()));
-  std::vector buffer(max_seg_size_);
+  if (flow_file_->getSize() > 268'435'455) {
+processor_->logger_->log_error("Sending message failed because MQTT limit 
maximum packet size [268'435'455] is exceeded by FlowFile of [%" PRIu64 "]", 
flow_file_->getSize());
+success_status_ = false;
+return -1;
+  }
+
+  if (processor_->maximum_packet_size_.has_value() && flow_file_->getSize() > 
*(processor_->maximum_packet_size_)) {
+processor_->logger_->log_error("Sending message failed because 
broker-requested maximum packet size [%" PRIu32 "] is exceeded by FlowFile of 
[%" PRIu64 "]",
+   *processor_->maximum_packet_size_, 
flow_file_->getSize());
+success_status_ = false;
+return -1;
+  }
+
+  std::vector buffer(flow_file_->getSize());
   read_size_ = 0;
-  status_ = 0;
-  while (read_size_ < flow_size_) {
-// MQTTClient_message::payloadlen is int, so we can't handle 2GB+
-const auto readRet = stream->read(buffer);
+  success_status_ = true;
+  while (read_size_ < flow_file_->getSize()) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-11-14 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1021602511


##
extensions/mqtt/processors/PublishMQTT.cpp:
##
@@ -34,76 +34,267 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr 
, const std::shared_ptr ) {
+void PublishMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (!context->getProperty(Topic).has_value()) {
+logger_->log_error("PublishMQTT: could not get Topic");
+  }
+
   if (const auto retain_opt = context->getProperty(Retain)) {
 retain_ = *retain_opt;
   }
   logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  AbstractMQTTProcessor::onSchedule(context, factory);
+  if (const auto message_expiry_interval = 
context->getProperty(MessageExpiryInterval)) {
+message_expiry_interval_ = 
std::chrono::duration_cast(message_expiry_interval->getMilliseconds());
+logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", 
int64_t{message_expiry_interval_->count()});
+  }
+
+  in_flight_message_counter_.setMqttVersion(mqtt_version_);
+  in_flight_message_counter_.setQoS(qos_);
 }
 
-void PublishMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr ) {
-  // reconnect if needed
-  reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr& 
context, const std::shared_ptr ) {
+  std::shared_ptr flow_file = session->get();
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not publish to MQTT broker because disconnected 
to %s", uri_);
-yield();
+  if (!flow_file) {
 return;
   }
 
-  std::shared_ptr flowFile = session->get();
+  // broker's Receive Maximum can change after reconnect
+  
in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(MQTT_MAX_RECEIVE_MAXIMUM));
 
-  if (!flowFile) {
-return;
-  }
-
-  PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_, 
topic_, client_, gsl::narrow(qos_), retain_);
-  session->read(flowFile, std::ref(callback));
-  if (callback.status_ < 0) {
-logger_->log_error("Failed to send flow to MQTT topic %s", topic_);
-session->transfer(flowFile, Failure);
+  const auto topic = getTopic(context, flow_file);
+  PublishMQTT::ReadCallback callback(this, flow_file, topic, 
getContentType(context, flow_file));
+  session->read(flow_file, std::ref(callback));
+  if (!callback.getSuccessStatus()) {
+logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on 
broker %s", flow_file->getUUIDStr(), topic, uri_);

Review Comment:
   Done, thanks for noticing it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-11-13 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1021156290


##
extensions/mqtt/processors/ConsumeMQTT.h:
##
@@ -68,63 +77,82 @@ class ConsumeMQTT : public 
processors::AbstractMQTTProcessor {
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
-  void onSchedule(const std::shared_ptr , const 
std::shared_ptr ) override;
-  void onTrigger(const std::shared_ptr , const 
std::shared_ptr ) override;
+  static constexpr const char* const MQTT_TOPIC_ATTRIBUTE = "mqtt.topic";
+  static constexpr const char* const MQTT_BROKER_ATTRIBUTE = "mqtt.broker";
+
+  void readProperties(const std::shared_ptr& context) 
override;
+  void onTriggerImpl(const std::shared_ptr& context, 
const std::shared_ptr& session) override;
   void initialize() override;
 
  private:
-  struct MQTTMessageDeleter {
-void operator()(MQTTAsync_message* message) {
-  MQTTAsync_freeMessage();
+  class WriteCallback {
+   public:
+explicit WriteCallback(const SmartMessage& message)
+  : message_(message) {
 }
-  };
 
-  void getReceivedMQTTMsg(std::deque>& msg_queue) {
-std::unique_ptr message;
-while (queue_.try_dequeue(message)) {
-  msg_queue.push_back(std::move(message));
+int64_t operator() (const std::shared_ptr& stream);
+
+[[nodiscard]] bool getSuccessStatus() const {
+  return success_status_;
 }
-  }
 
-  // MQTT async callback
-  static void subscriptionSuccess(void* context, MQTTAsync_successData* 
response) {
-auto* processor = reinterpret_cast(context);
-processor->onSubscriptionSuccess(response);
-  }
+   private:
+const SmartMessage& message_;
+bool success_status_ = true;
+  };
 
-  // MQTT async callback
-  static void subscriptionFailure(void* context, MQTTAsync_failureData* 
response) {
-auto* processor = reinterpret_cast(context);
-processor->onSubscriptionFailure(response);
-  }
+  std::queue getReceivedMqttMessages();
+
+  // MQTT static async callbacks, calling their non-static counterparts with 
context being pointer to "this"
+  static void subscriptionSuccess(void* context, MQTTAsync_successData* 
response);
+  static void subscriptionSuccess5(void* context, MQTTAsync_successData5* 
response);
+  static void subscriptionFailure(void* context, MQTTAsync_failureData* 
response);
+  static void subscriptionFailure5(void* context, MQTTAsync_failureData5* 
response);
 
-  void onSubscriptionSuccess(MQTTAsync_successData* /*response*/) {
-logger_->log_info("Successfully subscribed to MQTT topic %s on broker %s", 
topic_, uri_);
+  // MQTT non-static async callbacks
+  void onSubscriptionSuccess();
+  void onSubscriptionFailure(MQTTAsync_failureData* response);
+  void onSubscriptionFailure5(MQTTAsync_failureData5* response);
+  void onMessageReceived(SmartMessage smart_message) override;
+
+  void enqueueReceivedMQTTMsg(SmartMessage message);
+  void startupClient() override;
+  void checkProperties() override;
+  void checkBrokerLimitsImpl() override;
+
+  void resolveTopicFromAlias(const std::unique_ptr& message, std::string& topic);
+
+  bool getCleanSession() const override {
+return clean_session_;
   }
 
-  void onSubscriptionFailure(MQTTAsync_failureData* response) {
-logger_->log_error("Subscription failed on topic %s to MQTT broker %s 
(%d)", topic_, uri_, response->code);
-if (response->message != nullptr) {
-  logger_->log_error("Detailed reason for subscription failure: %s", 
response->message);
-}
+  bool getCleanStart() const override {
+return clean_start_;
   }
 
-  bool getCleanSession() const override {
-return cleanSession_;
+  std::chrono::seconds getSessionExpiryInterval() const override {
+return session_expiry_interval_;
   }
 
-  void onMessageReceived(char* topic_name, int /*topic_len*/, 
MQTTAsync_message* message) override;
+  void putUserPropertiesAsAttributes(const SmartMessage& message, const 
std::shared_ptr& flow_file, const 
std::shared_ptr& session) const;
+  void fillAttributeFromContentType(const SmartMessage& message, const 
std::shared_ptr& flow_file, const 
std::shared_ptr& session) const;
 
-  void enqueueReceivedMQTTMsg(std::unique_ptr message);
+  void setMqtt5ConnectOptionsImpl(MQTTProperties& connect_props) const 
override;
 
-  bool startupClient() override;
+  std::string topic_;
+  bool clean_session_ = true;
+  bool clean_start_ = true;
+  std::chrono::seconds session_expiry_interval_{0};
+  uint64_t max_queue_size_ = 1000;
+  std::string attribute_from_content_type_;
 
-  void checkProperties() override;
+  uint16_t topic_alias_maximum_{0};
+  uint16_t receive_maximum_{65535};

Review Comment:
   I called it `MQTT_MAX_RECEIVE_MAXIMUM`, because this is the maximum possible 
value for the Receive Maximum MQTT property. This could either be defined by 
the client or the broker and it means the maximum number of unacknowledged 
messages they can tolerate at a time.



-- 
This is an 

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-11-13 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1021135374


##
extensions/mqtt/processors/PublishMQTT.cpp:
##
@@ -34,76 +34,267 @@ void PublishMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void PublishMQTT::onSchedule(const std::shared_ptr 
, const std::shared_ptr ) {
+void PublishMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (!context->getProperty(Topic).has_value()) {
+logger_->log_error("PublishMQTT: could not get Topic");
+  }
+
   if (const auto retain_opt = context->getProperty(Retain)) {
 retain_ = *retain_opt;
   }
   logger_->log_debug("PublishMQTT: Retain [%d]", retain_);
 
-  AbstractMQTTProcessor::onSchedule(context, factory);
+  if (const auto message_expiry_interval = 
context->getProperty(MessageExpiryInterval)) {
+message_expiry_interval_ = 
std::chrono::duration_cast(message_expiry_interval->getMilliseconds());
+logger_->log_debug("PublishMQTT: MessageExpiryInterval [%" PRId64 "] s", 
int64_t{message_expiry_interval_->count()});
+  }
+
+  in_flight_message_counter_.setMqttVersion(mqtt_version_);
+  in_flight_message_counter_.setQoS(qos_);
 }
 
-void PublishMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr ) {
-  // reconnect if needed
-  reconnect();
+void PublishMQTT::onTriggerImpl(const std::shared_ptr& 
context, const std::shared_ptr ) {
+  std::shared_ptr flow_file = session->get();
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not publish to MQTT broker because disconnected 
to %s", uri_);
-yield();
+  if (!flow_file) {
 return;
   }
 
-  std::shared_ptr flowFile = session->get();
+  // broker's Receive Maximum can change after reconnect
+  in_flight_message_counter_.setMax(broker_receive_maximum_.value_or(65535));
 
-  if (!flowFile) {
-return;
-  }
-
-  PublishMQTT::ReadCallback callback(this, flowFile->getSize(), max_seg_size_, 
topic_, client_, gsl::narrow(qos_), retain_);
-  session->read(flowFile, std::ref(callback));
-  if (callback.status_ < 0) {
-logger_->log_error("Failed to send flow to MQTT topic %s", topic_);
-session->transfer(flowFile, Failure);
+  const auto topic = getTopic(context, flow_file);
+  PublishMQTT::ReadCallback callback(this, flow_file, topic, 
getContentType(context, flow_file));
+  session->read(flow_file, std::ref(callback));
+  if (!callback.getSuccessStatus()) {
+logger_->log_error("Failed to send flow file [%s] to MQTT topic '%s' on 
broker %s", flow_file->getUUIDStr(), topic, uri_);
+session->transfer(flow_file, Failure);
   } else {
-logger_->log_debug("Sent flow with length %d to MQTT topic %s", 
callback.read_size_, topic_);
-session->transfer(flowFile, Success);
+logger_->log_debug("Sent flow file [%s] with length %d to MQTT topic '%s' 
on broker %s", flow_file->getUUIDStr(), callback.getReadSize(), topic, uri_);
+session->transfer(flow_file, Success);
   }
 }
 
 int64_t PublishMQTT::ReadCallback::operator()(const 
std::shared_ptr& stream) {
-  if (flow_size_ < max_seg_size_)
-max_seg_size_ = flow_size_;
-  gsl_Expects(max_seg_size_ < 
gsl::narrow(std::numeric_limits::max()));
-  std::vector buffer(max_seg_size_);
+  if (flow_file_->getSize() > 268'435'455) {
+processor_->logger_->log_error("Sending message failed because MQTT limit 
maximum packet size [268'435'455] is exceeded by FlowFile of [%" PRIu64 "]", 
flow_file_->getSize());
+success_status_ = false;
+return -1;
+  }
+
+  if (processor_->maximum_packet_size_.has_value() && flow_file_->getSize() > 
*(processor_->maximum_packet_size_)) {
+processor_->logger_->log_error("Sending message failed because 
broker-requested maximum packet size [%" PRIu32 "] is exceeded by FlowFile of 
[%" PRIu64 "]",
+   *processor_->maximum_packet_size_, 
flow_file_->getSize());
+success_status_ = false;
+return -1;
+  }
+
+  std::vector buffer(flow_file_->getSize());
   read_size_ = 0;
-  status_ = 0;
-  while (read_size_ < flow_size_) {
-// MQTTClient_message::payloadlen is int, so we can't handle 2GB+
-const auto readRet = stream->read(buffer);
+  success_status_ = true;
+  while (read_size_ < flow_file_->getSize()) {

Review Comment:
   Is it guaranteed that `InputStream::read()` will read through all the data? 
This interface method has many implementations. If any of them works similarly 
to `read` system call, then it has to be read in a loop checking we got all 
data we wanted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-11-11 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1020323673


##
extensions/mqtt/processors/ConsumeMQTT.cpp:
##
@@ -34,111 +34,325 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+logger_->log_error("MQTT queue full");
 return;
   }
 
-  if (gsl::narrow(message->payloadlen) > max_seg_size_) {
-logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
-message->payloadlen = gsl::narrow(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr 
, const std::shared_ptr ) {
+void ConsumeMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (auto value = context->getProperty(Topic)) {
+topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty(CleanSession)) {
-cleanSession_ = *value;
-logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty(CleanStart)) {
+clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = 
context->getProperty(SessionExpiryInterval)) {
+session_expiry_interval_ = 
std::chrono::duration_cast(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", 
int64_t{session_expiry_interval_.count()});
 
   if (const auto value = 
context->getProperty(QueueBufferMaxMessage)) {
-maxQueueSize_ = *value;
-logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
+max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read 
before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", 
attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr ) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = 
context->getProperty(TopicAliasMaximum)) {
+topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", 
topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not consume from MQTT broker because 
disconnected to %s", uri_);
-yield();
-return;
+  if (const auto receive_maximum = 
context->getProperty(ReceiveMaximum)) {
+receive_maximum_ = gsl::narrow(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", 
receive_maximum_);
+}
 
-  std::deque> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& 
/*context*/, const std::shared_ptr& session) {
+  std::queue msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
 const auto& message = msg_queue.front();
-std::shared_ptr processFlowFile = session->create();
-int write_status{};
-session->write(processFlowFile, [, _status](const 
std::shared_ptr& stream) -> int64_t {
-  if (message->payloadlen < 0) {
-write_status = -1;
-return -1;
-  }
-  const auto len = 
stream->write(reinterpret_cast(message->payload), 
gsl::narrow(message->payloadlen));
-  if (io::isError(len)) {
-write_status = -1;
-return -1;
-  }
-  return gsl::narrow(len);
-});
-if (write_status < 0) {
-  logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
-  session->remove(processFlowFile);
+std::shared_ptr flow_file = session->create();
+WriteCallback write_callback(message);
+session->write(flow_file, write_callback);
+if (!write_callback.getSuccessStatus()) {
+  logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
flow_file->getUUIDStr());

Review Comment:
   Good points, done both!



-- 
This is an automated message 

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-11-11 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1020313074


##
extensions/mqtt/processors/ConsumeMQTT.cpp:
##
@@ -34,111 +34,325 @@ void ConsumeMQTT::initialize() {
   setSupportedRelationships(relationships());
 }
 
-void ConsumeMQTT::enqueueReceivedMQTTMsg(std::unique_ptr message) {
-  if (queue_.size_approx() >= maxQueueSize_) {
-logger_->log_warn("MQTT queue full");
+void ConsumeMQTT::enqueueReceivedMQTTMsg(SmartMessage message) {
+  if (queue_.size_approx() >= max_queue_size_) {
+logger_->log_error("MQTT queue full");
 return;
   }
 
-  if (gsl::narrow(message->payloadlen) > max_seg_size_) {
-logger_->log_debug("MQTT message was truncated while enqueuing, original 
length: %d", message->payloadlen);
-message->payloadlen = gsl::narrow(max_seg_size_);
-  }
-
-  logger_->log_debug("enqueuing MQTT message with length %d", 
message->payloadlen);
+  logger_->log_debug("enqueuing MQTT message with length %d", 
message.contents->payloadlen);
   queue_.enqueue(std::move(message));
 }
 
-void ConsumeMQTT::onSchedule(const std::shared_ptr 
, const std::shared_ptr ) {
+void ConsumeMQTT::readProperties(const std::shared_ptr& 
context) {
+  if (auto value = context->getProperty(Topic)) {
+topic_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic [%s]", topic_);
+
   if (const auto value = context->getProperty(CleanSession)) {
-cleanSession_ = *value;
-logger_->log_debug("ConsumeMQTT: CleanSession [%d]", cleanSession_);
+clean_session_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: CleanSession [%d]", clean_session_);
+
+  if (const auto value = context->getProperty(CleanStart)) {
+clean_start_ = *value;
+  }
+  logger_->log_debug("ConsumeMQTT: CleanStart [%d]", clean_start_);
+
+  if (const auto session_expiry_interval = 
context->getProperty(SessionExpiryInterval)) {
+session_expiry_interval_ = 
std::chrono::duration_cast(session_expiry_interval->getMilliseconds());
+  }
+  logger_->log_debug("ConsumeMQTT: SessionExpiryInterval [%" PRId64 "] s", 
int64_t{session_expiry_interval_.count()});
 
   if (const auto value = 
context->getProperty(QueueBufferMaxMessage)) {
-maxQueueSize_ = *value;
-logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
maxQueueSize_);
+max_queue_size_ = *value;
   }
+  logger_->log_debug("ConsumeMQTT: Queue Max Message [%" PRIu64 "]", 
max_queue_size_);
 
-  // this connects to broker, so properties of this processor must be read 
before
-  AbstractMQTTProcessor::onSchedule(context, factory);
-}
+  if (auto value = context->getProperty(AttributeFromContentType)) {
+attribute_from_content_type_ = std::move(*value);
+  }
+  logger_->log_debug("ConsumeMQTT: Attribute From Content Type [%s]", 
attribute_from_content_type_);
 
-void ConsumeMQTT::onTrigger(const std::shared_ptr& 
/*context*/, const std::shared_ptr ) {
-  // reconnect if needed
-  reconnect();
+  if (const auto topic_alias_maximum = 
context->getProperty(TopicAliasMaximum)) {
+topic_alias_maximum_ = gsl::narrow(*topic_alias_maximum);
+  }
+  logger_->log_debug("ConsumeMQTT: Topic Alias Maximum [%" PRIu16 "]", 
topic_alias_maximum_);
 
-  if (!MQTTAsync_isConnected(client_)) {
-logger_->log_error("Could not consume from MQTT broker because 
disconnected to %s", uri_);
-yield();
-return;
+  if (const auto receive_maximum = 
context->getProperty(ReceiveMaximum)) {
+receive_maximum_ = gsl::narrow(*receive_maximum);
   }
+  logger_->log_debug("ConsumeMQTT: Receive Maximum [%" PRIu16 "]", 
receive_maximum_);
+}
 
-  std::deque> msg_queue;
-  getReceivedMQTTMsg(msg_queue);
+void ConsumeMQTT::onTriggerImpl(const std::shared_ptr& 
/*context*/, const std::shared_ptr& session) {
+  std::queue msg_queue = getReceivedMqttMessages();
   while (!msg_queue.empty()) {
 const auto& message = msg_queue.front();
-std::shared_ptr processFlowFile = session->create();
-int write_status{};
-session->write(processFlowFile, [, _status](const 
std::shared_ptr& stream) -> int64_t {
-  if (message->payloadlen < 0) {
-write_status = -1;
-return -1;
-  }
-  const auto len = 
stream->write(reinterpret_cast(message->payload), 
gsl::narrow(message->payloadlen));
-  if (io::isError(len)) {
-write_status = -1;
-return -1;
-  }
-  return gsl::narrow(len);
-});
-if (write_status < 0) {
-  logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
processFlowFile->getUUIDStr());
-  session->remove(processFlowFile);
+std::shared_ptr flow_file = session->create();
+WriteCallback write_callback(message);
+session->write(flow_file, write_callback);
+if (!write_callback.getSuccessStatus()) {
+  logger_->log_error("ConsumeMQTT fail for the flow with UUID %s", 
flow_file->getUUIDStr());
+  session->remove(flow_file);
 } else {
-  

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-11-11 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1020294046


##
extensions/mqtt/processors/ConsumeMQTT.h:
##
@@ -35,27 +36,35 @@
 
 namespace org::apache::nifi::minifi::processors {
 
-#define MQTT_TOPIC_ATTRIBUTE "mqtt.topic"
-#define MQTT_BROKER_ATTRIBUTE "mqtt.broker"
-
 class ConsumeMQTT : public processors::AbstractMQTTProcessor {
  public:
   explicit ConsumeMQTT(const std::string& name, const utils::Identifier& uuid 
= {})
   : processors::AbstractMQTTProcessor(name, uuid) {
-maxQueueSize_ = 100;
   }
 
   EXTENSIONAPI static constexpr const char* Description = "This Processor gets 
the contents of a FlowFile from a MQTT broker for a specified topic. "
   "The the payload of the MQTT message becomes content of a FlowFile";
 
+  EXTENSIONAPI static const core::Property Topic;
   EXTENSIONAPI static const core::Property CleanSession;
+  EXTENSIONAPI static const core::Property CleanStart;
+  EXTENSIONAPI static const core::Property SessionExpiryInterval;
   EXTENSIONAPI static const core::Property QueueBufferMaxMessage;
+  EXTENSIONAPI static const core::Property AttributeFromContentType;
+  EXTENSIONAPI static const core::Property TopicAliasMaximum;
+  EXTENSIONAPI static const core::Property ReceiveMaximum;
 
   static auto properties() {
-return utils::array_cat(AbstractMQTTProcessor::properties(), std::array{
+return utils::array_cat(AbstractMQTTProcessor::basicProperties(), 
std::array{

Review Comment:
   Yes, the order looks better, might make sense when on a web UI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-11-11 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1020292687


##
extensions/mqtt/processors/ConsumeMQTT.h:
##
@@ -68,63 +77,82 @@ class ConsumeMQTT : public 
processors::AbstractMQTTProcessor {
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
-  void onSchedule(const std::shared_ptr , const 
std::shared_ptr ) override;
-  void onTrigger(const std::shared_ptr , const 
std::shared_ptr ) override;
+  static constexpr const char* const MQTT_TOPIC_ATTRIBUTE = "mqtt.topic";
+  static constexpr const char* const MQTT_BROKER_ATTRIBUTE = "mqtt.broker";
+
+  void readProperties(const std::shared_ptr& context) 
override;
+  void onTriggerImpl(const std::shared_ptr& context, 
const std::shared_ptr& session) override;
   void initialize() override;
 
  private:
-  struct MQTTMessageDeleter {
-void operator()(MQTTAsync_message* message) {
-  MQTTAsync_freeMessage();
+  class WriteCallback {
+   public:
+explicit WriteCallback(const SmartMessage& message)
+  : message_(message) {
 }
-  };
 
-  void getReceivedMQTTMsg(std::deque>& msg_queue) {
-std::unique_ptr message;
-while (queue_.try_dequeue(message)) {
-  msg_queue.push_back(std::move(message));
+int64_t operator() (const std::shared_ptr& stream);
+
+[[nodiscard]] bool getSuccessStatus() const {
+  return success_status_;
 }
-  }
 
-  // MQTT async callback
-  static void subscriptionSuccess(void* context, MQTTAsync_successData* 
response) {
-auto* processor = reinterpret_cast(context);
-processor->onSubscriptionSuccess(response);
-  }
+   private:
+const SmartMessage& message_;
+bool success_status_ = true;
+  };
 
-  // MQTT async callback
-  static void subscriptionFailure(void* context, MQTTAsync_failureData* 
response) {
-auto* processor = reinterpret_cast(context);
-processor->onSubscriptionFailure(response);
-  }
+  std::queue getReceivedMqttMessages();
+
+  // MQTT static async callbacks, calling their non-static counterparts with 
context being pointer to "this"
+  static void subscriptionSuccess(void* context, MQTTAsync_successData* 
response);
+  static void subscriptionSuccess5(void* context, MQTTAsync_successData5* 
response);
+  static void subscriptionFailure(void* context, MQTTAsync_failureData* 
response);
+  static void subscriptionFailure5(void* context, MQTTAsync_failureData5* 
response);
 
-  void onSubscriptionSuccess(MQTTAsync_successData* /*response*/) {
-logger_->log_info("Successfully subscribed to MQTT topic %s on broker %s", 
topic_, uri_);
+  // MQTT non-static async callbacks
+  void onSubscriptionSuccess();
+  void onSubscriptionFailure(MQTTAsync_failureData* response);
+  void onSubscriptionFailure5(MQTTAsync_failureData5* response);
+  void onMessageReceived(SmartMessage smart_message) override;
+
+  void enqueueReceivedMQTTMsg(SmartMessage message);
+  void startupClient() override;
+  void checkProperties() override;
+  void checkBrokerLimitsImpl() override;
+
+  void resolveTopicFromAlias(const std::unique_ptr& message, std::string& topic);
+
+  bool getCleanSession() const override {
+return clean_session_;
   }
 
-  void onSubscriptionFailure(MQTTAsync_failureData* response) {
-logger_->log_error("Subscription failed on topic %s to MQTT broker %s 
(%d)", topic_, uri_, response->code);
-if (response->message != nullptr) {
-  logger_->log_error("Detailed reason for subscription failure: %s", 
response->message);
-}
+  bool getCleanStart() const override {
+return clean_start_;
   }
 
-  bool getCleanSession() const override {
-return cleanSession_;
+  std::chrono::seconds getSessionExpiryInterval() const override {
+return session_expiry_interval_;
   }
 
-  void onMessageReceived(char* topic_name, int /*topic_len*/, 
MQTTAsync_message* message) override;
+  void putUserPropertiesAsAttributes(const SmartMessage& message, const 
std::shared_ptr& flow_file, const 
std::shared_ptr& session) const;
+  void fillAttributeFromContentType(const SmartMessage& message, const 
std::shared_ptr& flow_file, const 
std::shared_ptr& session) const;
 
-  void enqueueReceivedMQTTMsg(std::unique_ptr message);
+  void setMqtt5ConnectOptionsImpl(MQTTProperties& connect_props) const 
override;
 
-  bool startupClient() override;
+  std::string topic_;
+  bool clean_session_ = true;
+  bool clean_start_ = true;
+  std::chrono::seconds session_expiry_interval_{0};
+  uint64_t max_queue_size_ = 1000;
+  std::string attribute_from_content_type_;
 
-  void checkProperties() override;
+  uint16_t topic_alias_maximum_{0};
+  uint16_t receive_maximum_{65535};

Review Comment:
   It's match, however this is not defined by C++, but MQTT.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For 

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-11-10 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1019320537


##
extensions/mqtt/processors/ConsumeMQTT.h:
##
@@ -125,14 +119,14 @@ class ConsumeMQTT : public 
processors::AbstractMQTTProcessor {
   void onSubscriptionSuccess();
   void onSubscriptionFailure(MQTTAsync_failureData* response);
   void onSubscriptionFailure5(MQTTAsync_failureData5* response);
-  void onMessageReceived(char* topic_name, int /*topic_len*/, 
MQTTAsync_message* message) override;
+  void onMessageReceived(std::string topic, std::unique_ptr message) override;

Review Comment:
   Moved struct definition to `AbstractMQTTProcessor`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-11-10 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1019200246


##
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##
@@ -163,23 +200,261 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, _opts);
+  if (MQTTAsync_isConnected(client_)) {
+logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+return;
+  }
+  const int ret = MQTTAsync_connect(client_, _opts);
+  MQTTProperties_free(_props);
   if (ret != MQTTASYNC_SUCCESS) {
-logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, 
ret);
+logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
+return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& 
conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+MQTTProperty property;
+property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+property.value.integer4 = 
gsl::narrow(getSessionExpiryInterval().count());
+MQTTProperties_add(_props, );
+  }
+
+  if (!last_will_content_type_.empty()) {
+MQTTProperty property;
+property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
+property.value.data.len = last_will_content_type_.length();
+property.value.data.data = 
const_cast(last_will_content_type_.data());
+MQTTProperties_add(_props, );
+  }
+
+  conn_opts.willProperties = _props;
+
+  setMqtt5ConnectOptionsImpl(connect_props);
+}
+
+void AbstractMQTTProcessor::onTrigger(const 
std::shared_ptr& context, const 
std::shared_ptr& session) {
+  // read lock
+  std::shared_lock client_lock{client_mutex_};
+  if (client_ == nullptr) {
+// we are shutting down
+return;
+  }
+
+  // reconnect if needed
+  reconnect();
+
+  if (!MQTTAsync_isConnected(client_)) {
+logger_->log_error("Could not work with MQTT broker because disconnected 
to %s", uri_);
+yield();
+return;
+  }
+
+  onTriggerImpl(context, session);
 }
 
 void AbstractMQTTProcessor::freeResources() {
-  if (client_ && MQTTAsync_isConnected(client_)) {
-MQTTAsync_disconnectOptions disconnect_options = 
MQTTAsync_disconnectOptions_initializer;
-disconnect_options.context = this;
-disconnect_options.onSuccess = disconnectionSuccess;
-disconnect_options.onFailure = disconnectionFailure;
-disconnect_options.timeout = 
gsl::narrow(std::chrono::milliseconds{connection_timeout_}.count());
-MQTTAsync_disconnect(client_, _options);
+  // write lock
+  std::lock_guard client_lock{client_mutex_};
+
+  if (!client_) {
+return;
   }
-  if (client_) {
-MQTTAsync_destroy(_);
+
+  disconnect();
+
+  MQTTAsync_destroy(_);
+}
+
+void AbstractMQTTProcessor::disconnect() {
+  if (!MQTTAsync_isConnected(client_)) {
+return;
+  }
+
+  MQTTAsync_disconnectOptions disconnect_options = 
MQTTAsync_disconnectOptions_initializer;
+  std::packaged_task disconnect_finished_task(
+  [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* 
success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* 
failure_data_5) {
+onDisconnectFinished(success_data, success_data_5, failure_data, 
failure_data_5);
+  });
+  disconnect_options.context = _finished_task;
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+disconnect_options.onSuccess5 = connectionSuccess5;
+disconnect_options.onFailure5 = connectionFailure5;
+  } else {
+disconnect_options.onSuccess = connectionSuccess;
+disconnect_options.onFailure = connectionFailure;
+  }
+
+  disconnect_options.timeout = 
gsl::narrow(std::chrono::milliseconds{connection_timeout_}.count());
+
+  const int ret = MQTTAsync_disconnect(client_, _options);
+  if (ret != MQTTASYNC_SUCCESS) {
+logger_->log_error("MQTTAsync_disconnect failed to MQTT broker %s with 
error code [%d]", uri_, ret);
+return;
+  }
+
+  // wait until connection succeeds or fails
+  disconnect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setBrokerLimits(MQTTAsync_successData5* response) {
+  auto readProperty = [response] (MQTTPropertyCodes property_code, auto& 
out_var) {
+// defined by Paho MQTT C library
+static const int failure_code = -999;
+
+const int value = MQTTProperties_getNumericValue(>properties, 
property_code);
+if (value != failure_code) {
+  if constexpr (std::is_same_v&>) {
+out_var = std::chrono::seconds(value);
+  } else {
+out_var = gsl::narrow::type::value_type>(value);

Review Comment:
   Yes, we can!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go 

[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-11-10 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1019199181


##
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##
@@ -163,23 +200,261 @@ void AbstractMQTTProcessor::reconnect() {
   }
 
   logger_->log_info("Reconnecting to %s", uri_);
-  int ret = MQTTAsync_connect(client_, _opts);
+  if (MQTTAsync_isConnected(client_)) {
+logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
+return;
+  }
+  const int ret = MQTTAsync_connect(client_, _opts);
+  MQTTProperties_free(_props);
   if (ret != MQTTASYNC_SUCCESS) {
-logger_->log_error("Failed to reconnect to MQTT broker %s (%d)", uri_, 
ret);
+logger_->log_error("MQTTAsync_connect failed to MQTT broker %s with error 
code [%d]", uri_, ret);
+return;
   }
+
+  // wait until connection succeeds or fails
+  connect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setMqtt5ConnectOptions(MQTTAsync_connectOptions& 
conn_opts, MQTTProperties& connect_props, MQTTProperties& will_props) const {
+  conn_opts.cleanstart = getCleanStart();
+
+  {
+MQTTProperty property;
+property.identifier = MQTTPROPERTY_CODE_SESSION_EXPIRY_INTERVAL;
+property.value.integer4 = 
gsl::narrow(getSessionExpiryInterval().count());
+MQTTProperties_add(_props, );
+  }
+
+  if (!last_will_content_type_.empty()) {
+MQTTProperty property;
+property.identifier = MQTTPROPERTY_CODE_CONTENT_TYPE;
+property.value.data.len = last_will_content_type_.length();
+property.value.data.data = 
const_cast(last_will_content_type_.data());
+MQTTProperties_add(_props, );
+  }
+
+  conn_opts.willProperties = _props;
+
+  setMqtt5ConnectOptionsImpl(connect_props);
+}
+
+void AbstractMQTTProcessor::onTrigger(const 
std::shared_ptr& context, const 
std::shared_ptr& session) {
+  // read lock
+  std::shared_lock client_lock{client_mutex_};
+  if (client_ == nullptr) {
+// we are shutting down
+return;
+  }
+
+  // reconnect if needed
+  reconnect();
+
+  if (!MQTTAsync_isConnected(client_)) {
+logger_->log_error("Could not work with MQTT broker because disconnected 
to %s", uri_);
+yield();
+return;
+  }
+
+  onTriggerImpl(context, session);
 }
 
 void AbstractMQTTProcessor::freeResources() {
-  if (client_ && MQTTAsync_isConnected(client_)) {
-MQTTAsync_disconnectOptions disconnect_options = 
MQTTAsync_disconnectOptions_initializer;
-disconnect_options.context = this;
-disconnect_options.onSuccess = disconnectionSuccess;
-disconnect_options.onFailure = disconnectionFailure;
-disconnect_options.timeout = 
gsl::narrow(std::chrono::milliseconds{connection_timeout_}.count());
-MQTTAsync_disconnect(client_, _options);
+  // write lock
+  std::lock_guard client_lock{client_mutex_};
+
+  if (!client_) {
+return;
   }
-  if (client_) {
-MQTTAsync_destroy(_);
+
+  disconnect();
+
+  MQTTAsync_destroy(_);
+}
+
+void AbstractMQTTProcessor::disconnect() {
+  if (!MQTTAsync_isConnected(client_)) {
+return;
+  }
+
+  MQTTAsync_disconnectOptions disconnect_options = 
MQTTAsync_disconnectOptions_initializer;
+  std::packaged_task disconnect_finished_task(
+  [this] (MQTTAsync_successData* success_data, MQTTAsync_successData5* 
success_data_5, MQTTAsync_failureData* failure_data, MQTTAsync_failureData5* 
failure_data_5) {
+onDisconnectFinished(success_data, success_data_5, failure_data, 
failure_data_5);
+  });
+  disconnect_options.context = _finished_task;
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+disconnect_options.onSuccess5 = connectionSuccess5;
+disconnect_options.onFailure5 = connectionFailure5;
+  } else {
+disconnect_options.onSuccess = connectionSuccess;
+disconnect_options.onFailure = connectionFailure;
+  }
+
+  disconnect_options.timeout = 
gsl::narrow(std::chrono::milliseconds{connection_timeout_}.count());
+
+  const int ret = MQTTAsync_disconnect(client_, _options);
+  if (ret != MQTTASYNC_SUCCESS) {
+logger_->log_error("MQTTAsync_disconnect failed to MQTT broker %s with 
error code [%d]", uri_, ret);
+return;
+  }
+
+  // wait until connection succeeds or fails
+  disconnect_finished_task.get_future().get();
+}
+
+void AbstractMQTTProcessor::setBrokerLimits(MQTTAsync_successData5* response) {
+  auto readProperty = [response] (MQTTPropertyCodes property_code, auto& 
out_var) {
+// defined by Paho MQTT C library
+static const int failure_code = -999;

Review Comment:
   Thanks, did it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-11-10 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1019193763


##
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##
@@ -137,19 +148,45 @@ void AbstractMQTTProcessor::onSchedule(const 
std::shared_ptrlog_error("MQTT client is not existing while trying to 
reconnect");
-return;
+throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT 
client is not existing while trying to reconnect");
   }
   if (MQTTAsync_isConnected(client_)) {
-logger_->log_info("Already connected to %s, no need to reconnect", uri_);
+logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
 return;
   }
-  MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
+
+  MQTTAsync_connectOptions conn_opts;
+  MQTTProperties connect_props = MQTTProperties_initializer;
+  MQTTProperties will_props = MQTTProperties_initializer;
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+conn_opts = MQTTAsync_connectOptions_initializer5;
+conn_opts.onSuccess5 = connectionSuccess5;
+conn_opts.onFailure5 = connectionFailure5;
+conn_opts.connectProperties = _props;
+  } else {
+conn_opts = MQTTAsync_connectOptions_initializer;
+conn_opts.onSuccess = connectionSuccess;
+conn_opts.onFailure = connectionFailure;
+  }
+
+  if (mqtt_version_.value() == MqttVersions::V_3_1_0) {
+conn_opts.MQTTVersion = MQTTVERSION_3_1;
+  } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) {
+conn_opts.MQTTVersion = MQTTVERSION_3_1_1;

Review Comment:
   `MQTTAsync_connectOptions_initializer5` does it on line 163.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-11-10 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1019193763


##
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##
@@ -137,19 +148,45 @@ void AbstractMQTTProcessor::onSchedule(const 
std::shared_ptrlog_error("MQTT client is not existing while trying to 
reconnect");
-return;
+throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "MQTT 
client is not existing while trying to reconnect");
   }
   if (MQTTAsync_isConnected(client_)) {
-logger_->log_info("Already connected to %s, no need to reconnect", uri_);
+logger_->log_debug("Already connected to %s, no need to reconnect", uri_);
 return;
   }
-  MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
+
+  MQTTAsync_connectOptions conn_opts;
+  MQTTProperties connect_props = MQTTProperties_initializer;
+  MQTTProperties will_props = MQTTProperties_initializer;
+
+  if (mqtt_version_.value() == MqttVersions::V_5_0) {
+conn_opts = MQTTAsync_connectOptions_initializer5;
+conn_opts.onSuccess5 = connectionSuccess5;
+conn_opts.onFailure5 = connectionFailure5;
+conn_opts.connectProperties = _props;
+  } else {
+conn_opts = MQTTAsync_connectOptions_initializer;
+conn_opts.onSuccess = connectionSuccess;
+conn_opts.onFailure = connectionFailure;
+  }
+
+  if (mqtt_version_.value() == MqttVersions::V_3_1_0) {
+conn_opts.MQTTVersion = MQTTVERSION_3_1;
+  } else if (mqtt_version_.value() == MqttVersions::V_3_1_1) {
+conn_opts.MQTTVersion = MQTTVERSION_3_1_1;

Review Comment:
   MQTTAsync_connectOptions_initializer5 does it on line 163.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-11-10 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1019185396


##
extensions/mqtt/processors/AbstractMQTTProcessor.cpp:
##
@@ -20,51 +20,49 @@
 #include 
 
 #include "utils/StringUtils.h"
+#include "utils/ProcessorConfigUtils.h"
 #include "core/ProcessContext.h"
 
 namespace org::apache::nifi::minifi::processors {
 
 void AbstractMQTTProcessor::onSchedule(const 
std::shared_ptr , const 
std::shared_ptr& /*factory*/) {
   if (auto value = context->getProperty(BrokerURI)) {
 uri_ = std::move(*value);
-logger_->log_debug("AbstractMQTTProcessor: BrokerURI [%s]", uri_);
   }
+  logger_->log_debug("AbstractMQTTProcessor: BrokerURI [%s]", uri_);
+
+  mqtt_version_ = 
MqttVersions{utils::parsePropertyWithAllowableValuesOrThrow(*context, 
MqttVersion.getName(), MqttVersions::values())};

Review Comment:
   Thanks, used it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [nifi-minifi-cpp] adam-markovics commented on a diff in pull request #1432: MINIFICPP-1840 - Add support for MQTT 5

2022-11-10 Thread GitBox


adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1019170650


##
extensions/librdkafka/PublishKafka.cpp:
##
@@ -266,8 +266,8 @@ class ReadCallback {
 status_ = 0;
 called_ = true;
 
-gsl_Expects(max_seg_size_ != 0 || (flow_size_ == 0 && "max_seg_size_ == 0 
implies flow_size_ == 0"));
-// ^^ therefore checking max_seg_size_ == 0 handles both division by zero 
and flow_size_ == 0 cases
+gsl_Expects(max_seg_size_ != 0 || (flow_size_ == 0 && "max_message_size_ 
== 0 implies flow_size_ == 0"));
+// ^^ therefore checking max_message_size_ == 0 handles both division by 
zero and flow_size_ == 0 cases

Review Comment:
   I think it was. I used CLion's rename feature and it affected this file by 
accident. Restoring.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org