adam-markovics commented on code in PR #1432: URL: https://github.com/apache/nifi-minifi-cpp/pull/1432#discussion_r1020292687
########## extensions/mqtt/processors/ConsumeMQTT.h: ########## @@ -68,63 +77,82 @@ class ConsumeMQTT : public processors::AbstractMQTTProcessor { ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS - void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &factory) override; - void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override; + static constexpr const char* const MQTT_TOPIC_ATTRIBUTE = "mqtt.topic"; + static constexpr const char* const MQTT_BROKER_ATTRIBUTE = "mqtt.broker"; + + void readProperties(const std::shared_ptr<core::ProcessContext>& context) override; + void onTriggerImpl(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) override; void initialize() override; private: - struct MQTTMessageDeleter { - void operator()(MQTTAsync_message* message) { - MQTTAsync_freeMessage(&message); + class WriteCallback { + public: + explicit WriteCallback(const SmartMessage& message) + : message_(message) { } - }; - void getReceivedMQTTMsg(std::deque<std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>>& msg_queue) { - std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message; - while (queue_.try_dequeue(message)) { - msg_queue.push_back(std::move(message)); + int64_t operator() (const std::shared_ptr<io::OutputStream>& stream); + + [[nodiscard]] bool getSuccessStatus() const { + return success_status_; } - } - // MQTT async callback - static void subscriptionSuccess(void* context, MQTTAsync_successData* response) { - auto* processor = reinterpret_cast<ConsumeMQTT*>(context); - processor->onSubscriptionSuccess(response); - } + private: + const SmartMessage& message_; + bool success_status_ = true; + }; - // MQTT async callback - static void subscriptionFailure(void* context, MQTTAsync_failureData* response) { - auto* processor = reinterpret_cast<ConsumeMQTT*>(context); - processor->onSubscriptionFailure(response); - } + std::queue<SmartMessage> getReceivedMqttMessages(); + + // MQTT static async callbacks, calling their non-static counterparts with context being pointer to "this" + static void subscriptionSuccess(void* context, MQTTAsync_successData* response); + static void subscriptionSuccess5(void* context, MQTTAsync_successData5* response); + static void subscriptionFailure(void* context, MQTTAsync_failureData* response); + static void subscriptionFailure5(void* context, MQTTAsync_failureData5* response); - void onSubscriptionSuccess(MQTTAsync_successData* /*response*/) { - logger_->log_info("Successfully subscribed to MQTT topic %s on broker %s", topic_, uri_); + // MQTT non-static async callbacks + void onSubscriptionSuccess(); + void onSubscriptionFailure(MQTTAsync_failureData* response); + void onSubscriptionFailure5(MQTTAsync_failureData5* response); + void onMessageReceived(SmartMessage smart_message) override; + + void enqueueReceivedMQTTMsg(SmartMessage message); + void startupClient() override; + void checkProperties() override; + void checkBrokerLimitsImpl() override; + + void resolveTopicFromAlias(const std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter>& message, std::string& topic); + + bool getCleanSession() const override { + return clean_session_; } - void onSubscriptionFailure(MQTTAsync_failureData* response) { - logger_->log_error("Subscription failed on topic %s to MQTT broker %s (%d)", topic_, uri_, response->code); - if (response->message != nullptr) { - logger_->log_error("Detailed reason for subscription failure: %s", response->message); - } + bool getCleanStart() const override { + return clean_start_; } - bool getCleanSession() const override { - return cleanSession_; + std::chrono::seconds getSessionExpiryInterval() const override { + return session_expiry_interval_; } - void onMessageReceived(char* topic_name, int /*topic_len*/, MQTTAsync_message* message) override; + void putUserPropertiesAsAttributes(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const; + void fillAttributeFromContentType(const SmartMessage& message, const std::shared_ptr<core::FlowFile>& flow_file, const std::shared_ptr<core::ProcessSession>& session) const; - void enqueueReceivedMQTTMsg(std::unique_ptr<MQTTAsync_message, MQTTMessageDeleter> message); + void setMqtt5ConnectOptionsImpl(MQTTProperties& connect_props) const override; - bool startupClient() override; + std::string topic_; + bool clean_session_ = true; + bool clean_start_ = true; + std::chrono::seconds session_expiry_interval_{0}; + uint64_t max_queue_size_ = 1000; + std::string attribute_from_content_type_; - void checkProperties() override; + uint16_t topic_alias_maximum_{0}; + uint16_t receive_maximum_{65535}; Review Comment: It's match, however this is not defined by C++, but MQTT. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org