adam-markovics commented on code in PR #1432:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1020292687


##########
extensions/mqtt/processors/ConsumeMQTT.h:
##########
@@ -68,63 +77,82 @@ class ConsumeMQTT : public 
processors::AbstractMQTTProcessor {
 
   ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
 
-  void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSessionFactory> &factory) override;
-  void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const 
std::shared_ptr<core::ProcessSession> &session) override;
+  static constexpr const char* const MQTT_TOPIC_ATTRIBUTE = "mqtt.topic";
+  static constexpr const char* const MQTT_BROKER_ATTRIBUTE = "mqtt.broker";
+
+  void readProperties(const std::shared_ptr<core::ProcessContext>& context) 
override;
+  void onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context, 
const std::shared_ptr<core::ProcessSession>& session) override;
   void initialize() override;
 
  private:
-  struct MQTTMessageDeleter {
-    void operator()(MQTTAsync_message* message) {
-      MQTTAsync_freeMessage(&message);
+  class WriteCallback {
+   public:
+    explicit WriteCallback(const SmartMessage& message)
+      : message_(message) {
     }
-  };
 
-  void getReceivedMQTTMsg(std::deque<std::unique_ptr<MQTTAsync_message, 
MQTTMessageDeleter>>& msg_queue) {
-    std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message;
-    while (queue_.try_dequeue(message)) {
-      msg_queue.push_back(std::move(message));
+    int64_t operator() (const std::shared_ptr<io::OutputStream>& stream);
+
+    [[nodiscard]] bool getSuccessStatus() const {
+      return success_status_;
     }
-  }
 
-  // MQTT async callback
-  static void subscriptionSuccess(void* context, MQTTAsync_successData* 
response) {
-    auto* processor = reinterpret_cast<ConsumeMQTT*>(context);
-    processor->onSubscriptionSuccess(response);
-  }
+   private:
+    const SmartMessage& message_;
+    bool success_status_ = true;
+  };
 
-  // MQTT async callback
-  static void subscriptionFailure(void* context, MQTTAsync_failureData* 
response) {
-    auto* processor = reinterpret_cast<ConsumeMQTT*>(context);
-    processor->onSubscriptionFailure(response);
-  }
+  std::queue<SmartMessage> getReceivedMqttMessages();
+
+  // MQTT static async callbacks, calling their non-static counterparts with 
context being pointer to "this"
+  static void subscriptionSuccess(void* context, MQTTAsync_successData* 
response);
+  static void subscriptionSuccess5(void* context, MQTTAsync_successData5* 
response);
+  static void subscriptionFailure(void* context, MQTTAsync_failureData* 
response);
+  static void subscriptionFailure5(void* context, MQTTAsync_failureData5* 
response);
 
-  void onSubscriptionSuccess(MQTTAsync_successData* /*response*/) {
-    logger_->log_info("Successfully subscribed to MQTT topic %s on broker %s", 
topic_, uri_);
+  // MQTT non-static async callbacks
+  void onSubscriptionSuccess();
+  void onSubscriptionFailure(MQTTAsync_failureData* response);
+  void onSubscriptionFailure5(MQTTAsync_failureData5* response);
+  void onMessageReceived(SmartMessage smart_message) override;
+
+  void enqueueReceivedMQTTMsg(SmartMessage message);
+  void startupClient() override;
+  void checkProperties() override;
+  void checkBrokerLimitsImpl() override;
+
+  void resolveTopicFromAlias(const std::unique_ptr<MQTTAsync_message, 
MQTTMessageDeleter>& message, std::string& topic);
+
+  bool getCleanSession() const override {
+    return clean_session_;
   }
 
-  void onSubscriptionFailure(MQTTAsync_failureData* response) {
-    logger_->log_error("Subscription failed on topic %s to MQTT broker %s 
(%d)", topic_, uri_, response->code);
-    if (response->message != nullptr) {
-      logger_->log_error("Detailed reason for subscription failure: %s", 
response->message);
-    }
+  bool getCleanStart() const override {
+    return clean_start_;
   }
 
-  bool getCleanSession() const override {
-    return cleanSession_;
+  std::chrono::seconds getSessionExpiryInterval() const override {
+    return session_expiry_interval_;
   }
 
-  void onMessageReceived(char* topic_name, int /*topic_len*/, 
MQTTAsync_message* message) override;
+  void putUserPropertiesAsAttributes(const SmartMessage& message, const 
std::shared_ptr<core::FlowFile>& flow_file, const 
std::shared_ptr<core::ProcessSession>& session) const;
+  void fillAttributeFromContentType(const SmartMessage& message, const 
std::shared_ptr<core::FlowFile>& flow_file, const 
std::shared_ptr<core::ProcessSession>& session) const;
 
-  void enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, 
MQTTMessageDeleter> message);
+  void setMqtt5ConnectOptionsImpl(MQTTProperties& connect_props) const 
override;
 
-  bool startupClient() override;
+  std::string topic_;
+  bool clean_session_ = true;
+  bool clean_start_ = true;
+  std::chrono::seconds session_expiry_interval_{0};
+  uint64_t max_queue_size_ = 1000;
+  std::string attribute_from_content_type_;
 
-  void checkProperties() override;
+  uint16_t topic_alias_maximum_{0};
+  uint16_t receive_maximum_{65535};

Review Comment:
   It's match, however this is not defined by C++, but MQTT.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to