Github user minifirocks commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/228#discussion_r159807078
  
    --- Diff: extensions/mqtt/PublishMQTT.h ---
    @@ -0,0 +1,142 @@
    +/**
    + * @file PublishMQTT.h
    + * PublishMQTT class declaration
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef __PUBLISH_MQTT_H__
    +#define __PUBLISH_MQTT_H__
    +
    +#include "FlowFileRecord.h"
    +#include "core/Processor.h"
    +#include "core/ProcessSession.h"
    +#include "core/Core.h"
    +#include "core/Resource.h"
    +#include "core/Property.h"
    +#include "core/logging/LoggerConfiguration.h"
    +#include "MQTTClient.h"
    +#include "AbstractMQTTProcessor.h"
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace processors {
    +
    +// PublishMQTT Class
    +class PublishMQTT: public processors::AbstractMQTTProcessor {
    +public:
    +  // Constructor
    +  /*!
    +   * Create a new processor
    +   */
    +  explicit PublishMQTT(std::string name, uuid_t uuid = NULL)
    +    : processors::AbstractMQTTProcessor(name, uuid), 
logger_(logging::LoggerFactory<PublishMQTT>::getLogger()) {
    +    retain_ = false;
    +    max_seg_size_ = ULLONG_MAX;
    +  }
    +  // Destructor
    +  virtual ~PublishMQTT() {
    +  }
    +  // Processor Name
    +  static constexpr char const* ProcessorName = "PublishMQTT";
    +  // Supported Properties
    +  static core::Property Retain;
    +  static core::Property MaxFlowSegSize;
    +
    +  // Nest Callback Class for read stream
    +  class ReadCallback: public InputStreamCallback {
    +  public:
    +    ReadCallback(uint64_t flow_size, uint64_t max_seg_size, const 
std::string &key, MQTTClient client,
    +        int qos, bool retain, MQTTClient_deliveryToken &token) :
    +        flow_size_(flow_size), max_seg_size_(max_seg_size), key_(key), 
client_(client),
    +        qos_(qos), retain_(retain), token_(token) {
    +      status_ = 0;
    +      read_size_ = 0;
    +    }
    +    ~ReadCallback() {
    +    }
    +    int64_t process(std::shared_ptr<io::BaseStream> stream) {
    +      if (flow_size_ < max_seg_size_)
    +        max_seg_size_ = flow_size_;
    +      std::vector<unsigned char> buffer;
    +      buffer.reserve(max_seg_size_);
    +      read_size_ = 0;
    +      status_ = 0;
    +      while (read_size_ < flow_size_) {
    +        int readRet = stream->read(&buffer[0], max_seg_size_);
    +        if (readRet < 0) {
    +          status_ = -1;
    +          return read_size_;
    +        }
    +        if (readRet > 0) {
    +          MQTTClient_message pubmsg = MQTTClient_message_initializer;
    +          pubmsg.payload = &buffer[0];
    +          pubmsg.payloadlen = readRet;
    +          pubmsg.qos = qos_;
    +          pubmsg.retained = retain_;
    +          if (MQTTClient_publishMessage(client_, key_.c_str(), &pubmsg, 
&token_) != MQTTCLIENT_SUCCESS) {
    --- End diff --
    
    it add the MQTT header and call socket write.
    the deliverable callback is for QOS.
    /**
     * This is a callback function. The client application
     * must provide an implementation of this function to enable asynchronous
     * notification of delivery of messages. The function is registered with the
     * client library by passing it as an argument to MQTTClient_setCallbacks().
     * It is called by the client library after the client application has
     * published a message to the server. It indicates that the necessary
     * handshaking and acknowledgements for the requested quality of service 
(see
     * MQTTClient_message.qos) have been completed. This function is executed 
on a
     * separate thread to the one on which the client application is running.
     * <b>Note:</b>MQTTClient_deliveryComplete() is not called when messages are
     * published at QoS0.
     * @param context A pointer to the <i>context</i> value originally passed to
     * MQTTClient_setCallbacks(), which contains any application-specific 
context.
     * @param dt The ::MQTTClient_deliveryToken associated with
     * the published message. Applications can check that all messages have been
     * correctly published by matching the delivery tokens returned from calls 
to
     * MQTTClient_publish() and MQTTClient_publishMessage() with the tokens 
passed
     * to this callback.
     */
    typedef void MQTTClient_deliveryComplete(void* context, 
MQTTClient_deliveryToken dt);


---

Reply via email to