Github user phrocker commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159792523 --- Diff: extensions/mqtt/AbstractMQTTProcessor.cpp --- @@ -0,0 +1,158 @@ +/** + * @file AbstractMQTTProcessor.cpp + * AbstractMQTTProcessor class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "AbstractMQTTProcessor.h" +#include <stdio.h> +#include <memory> +#include <string> +#include "utils/TimeUtil.h" +#include "utils/StringUtils.h" +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +core::Property AbstractMQTTProcessor::BrokerURL("Broker URI", "The URI to use to connect to the MQTT broker", ""); +core::Property AbstractMQTTProcessor::CleanSession("Session state", "Whether to start afresh or resume previous flows. See the allowable value descriptions for more details", "true"); +core::Property AbstractMQTTProcessor::ClientID("Client ID", "MQTT client ID to use", ""); +core::Property AbstractMQTTProcessor::UserName("Username", "Username to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::PassWord("Password", "Password to use when connecting to the broker", ""); +core::Property AbstractMQTTProcessor::KeepLiveInterval("Keep Alive Interval", "Defines the maximum time interval between messages sent or received", "60 sec"); +core::Property AbstractMQTTProcessor::ConnectionTimeOut("Connection Timeout", "Maximum time interval the client will wait for the network connection to the MQTT server", "30 sec"); +core::Property AbstractMQTTProcessor::QOS("Quality of Service", "The Quality of Service(QoS) to send the message with. Accepts three values '0', '1' and '2'", "MQTT_QOS_0"); +core::Property AbstractMQTTProcessor::Topic("Topic", "The topic to publish the message to", ""); +core::Relationship AbstractMQTTProcessor::Success("success", "FlowFiles that are sent successfully to the destination are transferred to this relationship"); +core::Relationship AbstractMQTTProcessor::Failure("failure", "FlowFiles that failed to send to the destination are transferred to this relationship"); + +void AbstractMQTTProcessor::initialize() { + // Set the supported properties + std::set<core::Property> properties; + properties.insert(BrokerURL); + properties.insert(CleanSession); + properties.insert(ClientID); + properties.insert(UserName); + properties.insert(PassWord); + properties.insert(KeepLiveInterval); + properties.insert(ConnectionTimeOut); + properties.insert(QOS); + properties.insert(Topic); + setSupportedProperties(properties); + // Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + relationships.insert(Failure); + setSupportedRelationships(relationships); +} + +void AbstractMQTTProcessor::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory *sessionFactory) { + std::string value; + int64_t valInt; + value = ""; + if (context->getProperty(BrokerURL.getName(), value) && !value.empty()) { + uri_ = value; + logger_->log_info("AbstractMQTTProcessor: BrokerURL [%s]", uri_); + } + value = ""; + if (context->getProperty(ClientID.getName(), value) && !value.empty()) { + clientID_ = value; + logger_->log_info("AbstractMQTTProcessor: ClientID [%s]", clientID_); + } + value = ""; + if (context->getProperty(Topic.getName(), value) && !value.empty()) { + topic_ = value; + logger_->log_info("AbstractMQTTProcessor: Topic [%s]", topic_); + } + value = ""; + if (context->getProperty(UserName.getName(), value) && !value.empty()) { + userName_ = value; + logger_->log_info("AbstractMQTTProcessor: UserName [%s]", userName_); + } + value = ""; + if (context->getProperty(PassWord.getName(), value) && !value.empty()) { + passWord_ = value; + logger_->log_info("AbstractMQTTProcessor: PassWord [%s]", passWord_); + } + value = ""; + if (context->getProperty(CleanSession.getName(), value) && !value.empty() && + org::apache::nifi::minifi::utils::StringUtils::StringToBool(value, cleanSession_)) { + logger_->log_info("AbstractMQTTProcessor: CleanSession [%d]", cleanSession_); + } + value = ""; + if (context->getProperty(KeepLiveInterval.getName(), value) && !value.empty()) { + core::TimeUnit unit; + if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) { + keepAliveInterval_ = valInt/1000; + logger_->log_info("AbstractMQTTProcessor: KeepLiveInterval [%d]", keepAliveInterval_); + } + } + value = ""; + if (context->getProperty(ConnectionTimeOut.getName(), value) && !value.empty()) { + core::TimeUnit unit; + if (core::Property::StringToTime(value, valInt, unit) && core::Property::ConvertTimeUnitToMS(valInt, unit, valInt)) { + connectionTimeOut_ = valInt/1000; + logger_->log_info("AbstractMQTTProcessor: ConnectionTimeOut [%d]", connectionTimeOut_); + } + } + value = ""; + if (context->getProperty(QOS.getName(), value) && !value.empty() && (value == MQTT_QOS_0 || value == MQTT_QOS_1 || MQTT_QOS_2) && + core::Property::StringToInt(value, valInt)) { + qos_ = valInt; + logger_->log_info("PublishKafka: QOS [%d]", qos_); + } + if (!client_) { + MQTTClient_create(&client_, uri_.c_str(), clientID_.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL); --- End diff -- what's the thread safety of this function call?
---