Github user arpadboda commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/437#discussion_r233499744
  
    --- Diff: extensions/coap/controllerservice/CoapConnector.h ---
    @@ -0,0 +1,207 @@
    +/**
    + *
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_COAPCONNECTOR_H_
    +#define LIBMINIFI_INCLUDE_CONTROLLERS_COAPCONNECTOR_H_
    +
    +
    +#include "core/logging/LoggerConfiguration.h"
    +#include "coap_functions.h"
    +#include "core/controller/ControllerService.h"
    +#include <memory>
    +#include <unordered_map>
    +
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
    +namespace controllers {
    +
    +/**
    + * Purpose and Justification: Controller services function as a layerable 
way to provide
    + * services to internal services. While a controller service is generally 
configured from the flow,
    + * we want to follow the open closed principle and provide CoAP services 
to other components.
    + *
    + *
    + */
    +class CoapConnectorService : public core::controller::ControllerService {
    + public:
    +
    +  /**
    +   * CoapMessage is in internal message format that is sent to and from 
consumers of this controller service.
    +   */
    +  class CoAPMessage {
    +   public:
    +
    +    explicit CoAPMessage(unsigned int code = 0)
    +        : code_(code) {
    +    }
    +
    +    explicit CoAPMessage(unsigned int code, unsigned char *data, size_t 
dataLen)
    +        : code_(code) {
    +      if (data && dataLen > 0)
    +        std::copy(data, data + dataLen, std::back_inserter(data_));
    +    }
    +
    +    CoAPMessage(const CoAPMessage &other) = delete;
    +
    +    CoAPMessage(CoAPMessage &&other) = default;
    +
    +    ~CoAPMessage() {
    +    }
    +
    +    size_t getSize() const {
    +      return data_.size();
    +    }
    +    unsigned char const *getData() const {
    +      return data_.data();
    +    }
    +
    +    bool isRegistrationRequest() {
    +      if (data_.size() != 8) {
    +        return false;
    +      }
    +      return code_ == COAP_RESPONSE_400 && std::string((char*) 
data_.data(), data_.size()) == "register";
    +    }
    +    CoAPMessage &operator=(const CoAPMessage &other) = delete;
    +    CoAPMessage &operator=(CoAPMessage &&other) = default;
    +   private:
    +    unsigned int code_;
    +    std::vector<unsigned char> data_;
    +  };
    +
    +  /**
    +   * Constructors for the controller service.
    +   */
    +  explicit CoapConnectorService(const std::string &name, const std::string 
&id)
    +      : ControllerService(name, id),
    +        port_(0),
    +        initialized_(false),
    +        logger_(logging::LoggerFactory<CoapConnectorService>::getLogger()) 
{
    +    initialize();
    +  }
    +
    +  explicit CoapConnectorService(const std::string &name, utils::Identifier 
uuid = utils::Identifier())
    +      : ControllerService(name, uuid),
    +        port_(0),
    +        initialized_(false),
    +        logger_(logging::LoggerFactory<CoapConnectorService>::getLogger()) 
{
    +    initialize();
    +  }
    +
    +  explicit CoapConnectorService(const std::string &name, const 
std::shared_ptr<Configure> &configuration)
    +      : ControllerService(name),
    +        port_(0),
    +        initialized_(false),
    +        logger_(logging::LoggerFactory<CoapConnectorService>::getLogger()) 
{
    +    setConfiguration(configuration);
    +    initialize();
    +  }
    +
    +  /**
    +   * Parameters needed.
    +   */
    +  static core::Property RemoteServer;
    +  static core::Property Port;
    +  static core::Property MaxQueueSize;
    +
    +  virtual void initialize();
    +
    +  void yield() {
    +
    +  }
    +
    +  bool isRunning() {
    +    return getState() == core::controller::ControllerServiceState::ENABLED;
    +  }
    +
    +  bool isWorkAvailable() {
    +    return false;
    +  }
    +
    +  virtual void onEnable();
    +
    +  /**
    +   * Sends the payload to the endpoint, returning the response as we 
await. Will retry transmission
    +   * @param type type of payload to endpoint interaction ( GET, POST, PUT, 
DELETE ).
    +   * @param end endpoint is the connecting endpoint on the server
    +   * @param payload is the data to be sent
    +   * @param size size of the payload to be sent
    +   * @return CoAPMessage that contains the response code and data, if any.
    +   */
    +  CoAPMessage sendPayload(uint8_t type, const std::string endpoint, 
unsigned char *payload, size_t size);
    +
    + protected:
    +
    +  /**
    +   * Determines if the pointer is present in the internal map.
    +   */
    +  bool hasResponse(coap_context_t *ctx) {
    +    std::lock_guard<std::mutex> lock(connector_mutex_);
    +    return messages_.find(ctx) != messages_.end();
    +  }
    +
    +  void enqueueResponse(coap_context_t *ctx, CoAPMessage &&msg) {
    +    std::lock_guard<std::mutex> lock(connector_mutex_);
    +    messages_.insert(std::make_pair(ctx, std::move(msg)));
    +  }
    +
    +  void initializeProperties();
    +
    +  static void receiveError(void *receiver_context, coap_context_t *ctx, 
unsigned int code) {
    +    CoapConnectorService *connector = 
static_cast<CoapConnectorService*>(receiver_context);
    +    CoAPMessage message(code, 0x00, 0);
    +    connector->enqueueResponse(ctx, std::move(message));
    +  }
    +
    +  static void receiveMessage(void *receiver_context, coap_context_t *ctx, 
unsigned int code, unsigned char *dp, size_t *len) {
    +    CoapConnectorService *connector = 
static_cast<CoapConnectorService*>(receiver_context);
    +    CoAPMessage message(code, dp, *len);
    +    connector->enqueueResponse(ctx, std::move(message));
    +  }
    +
    +  // map of messages based on the context. We only allow a single message 
per context
    +  // at any given time.
    +  std::unordered_map<coap_context_t*, CoAPMessage> messages_;
    +
    +  // connector mutex to controll access to the mapping, above.
    +  std::mutex connector_mutex_;
    +
    +  // initialization mutex.
    +  std::mutex initialization_mutex_;
    +
    +  std::atomic<bool> initialized_;
    --- End diff --
    
    Do we plan to have multiple instances of COAPConnectorSerivce? 


---

Reply via email to