Github user minifirocks commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807078 --- Diff: extensions/mqtt/PublishMQTT.h --- @@ -0,0 +1,142 @@ +/** + * @file PublishMQTT.h + * PublishMQTT class declaration + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __PUBLISH_MQTT_H__ +#define __PUBLISH_MQTT_H__ + +#include "FlowFileRecord.h" +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Core.h" +#include "core/Resource.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "MQTTClient.h" +#include "AbstractMQTTProcessor.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace processors { + +// PublishMQTT Class +class PublishMQTT: public processors::AbstractMQTTProcessor { +public: + // Constructor + /*! + * Create a new processor + */ + explicit PublishMQTT(std::string name, uuid_t uuid = NULL) + : processors::AbstractMQTTProcessor(name, uuid), logger_(logging::LoggerFactory<PublishMQTT>::getLogger()) { + retain_ = false; + max_seg_size_ = ULLONG_MAX; + } + // Destructor + virtual ~PublishMQTT() { + } + // Processor Name + static constexpr char const* ProcessorName = "PublishMQTT"; + // Supported Properties + static core::Property Retain; + static core::Property MaxFlowSegSize; + + // Nest Callback Class for read stream + class ReadCallback: public InputStreamCallback { + public: + ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const std::string &key, MQTTClient client, + int qos, bool retain, MQTTClient_deliveryToken &token) : + flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), client_(client), + qos_(qos), retain_(retain), token_(token) { + status_ = 0; + read_size_ = 0; + } + ~ReadCallback() { + } + int64_t process(std::shared_ptr<io::BaseStream> stream) { + if (flow_size_ < max_seg_size_) + max_seg_size_ = flow_size_; + std::vector<unsigned char> buffer; + buffer.reserve(max_seg_size_); + read_size_ = 0; + status_ = 0; + while (read_size_ < flow_size_) { + int readRet = stream->read(&buffer[0], max_seg_size_); + if (readRet < 0) { + status_ = -1; + return read_size_; + } + if (readRet > 0) { + MQTTClient_message pubmsg = MQTTClient_message_initializer; + pubmsg.payload = &buffer[0]; + pubmsg.payloadlen = readRet; + pubmsg.qos = qos_; + pubmsg.retained = retain_; + if (MQTTClient_publishMessage(client_, key_.c_str(), &pubmsg, &token_) != MQTTCLIENT_SUCCESS) { --- End diff -- it add the MQTT header and call socket write. the deliverable callback is for QOS. /** * This is a callback function. The client application * must provide an implementation of this function to enable asynchronous * notification of delivery of messages. The function is registered with the * client library by passing it as an argument to MQTTClient_setCallbacks(). * It is called by the client library after the client application has * published a message to the server. It indicates that the necessary * handshaking and acknowledgements for the requested quality of service (see * MQTTClient_message.qos) have been completed. This function is executed on a * separate thread to the one on which the client application is running. * <b>Note:</b>MQTTClient_deliveryComplete() is not called when messages are * published at QoS0. * @param context A pointer to the <i>context</i> value originally passed to * MQTTClient_setCallbacks(), which contains any application-specific context. * @param dt The ::MQTTClient_deliveryToken associated with * the published message. Applications can check that all messages have been * correctly published by matching the delivery tokens returned from calls to * MQTTClient_publish() and MQTTClient_publishMessage() with the tokens passed * to this callback. */ typedef void MQTTClient_deliveryComplete(void* context, MQTTClient_deliveryToken dt);
---