Github user arpadboda commented on a diff in the pull request: https://github.com/apache/nifi-minifi-cpp/pull/437#discussion_r233499744 --- Diff: extensions/coap/controllerservice/CoapConnector.h --- @@ -0,0 +1,207 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_CONTROLLERS_COAPCONNECTOR_H_ +#define LIBMINIFI_INCLUDE_CONTROLLERS_COAPCONNECTOR_H_ + + +#include "core/logging/LoggerConfiguration.h" +#include "coap_functions.h" +#include "core/controller/ControllerService.h" +#include <memory> +#include <unordered_map> + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace controllers { + +/** + * Purpose and Justification: Controller services function as a layerable way to provide + * services to internal services. While a controller service is generally configured from the flow, + * we want to follow the open closed principle and provide CoAP services to other components. + * + * + */ +class CoapConnectorService : public core::controller::ControllerService { + public: + + /** + * CoapMessage is in internal message format that is sent to and from consumers of this controller service. + */ + class CoAPMessage { + public: + + explicit CoAPMessage(unsigned int code = 0) + : code_(code) { + } + + explicit CoAPMessage(unsigned int code, unsigned char *data, size_t dataLen) + : code_(code) { + if (data && dataLen > 0) + std::copy(data, data + dataLen, std::back_inserter(data_)); + } + + CoAPMessage(const CoAPMessage &other) = delete; + + CoAPMessage(CoAPMessage &&other) = default; + + ~CoAPMessage() { + } + + size_t getSize() const { + return data_.size(); + } + unsigned char const *getData() const { + return data_.data(); + } + + bool isRegistrationRequest() { + if (data_.size() != 8) { + return false; + } + return code_ == COAP_RESPONSE_400 && std::string((char*) data_.data(), data_.size()) == "register"; + } + CoAPMessage &operator=(const CoAPMessage &other) = delete; + CoAPMessage &operator=(CoAPMessage &&other) = default; + private: + unsigned int code_; + std::vector<unsigned char> data_; + }; + + /** + * Constructors for the controller service. + */ + explicit CoapConnectorService(const std::string &name, const std::string &id) + : ControllerService(name, id), + port_(0), + initialized_(false), + logger_(logging::LoggerFactory<CoapConnectorService>::getLogger()) { + initialize(); + } + + explicit CoapConnectorService(const std::string &name, utils::Identifier uuid = utils::Identifier()) + : ControllerService(name, uuid), + port_(0), + initialized_(false), + logger_(logging::LoggerFactory<CoapConnectorService>::getLogger()) { + initialize(); + } + + explicit CoapConnectorService(const std::string &name, const std::shared_ptr<Configure> &configuration) + : ControllerService(name), + port_(0), + initialized_(false), + logger_(logging::LoggerFactory<CoapConnectorService>::getLogger()) { + setConfiguration(configuration); + initialize(); + } + + /** + * Parameters needed. + */ + static core::Property RemoteServer; + static core::Property Port; + static core::Property MaxQueueSize; + + virtual void initialize(); + + void yield() { + + } + + bool isRunning() { + return getState() == core::controller::ControllerServiceState::ENABLED; + } + + bool isWorkAvailable() { + return false; + } + + virtual void onEnable(); + + /** + * Sends the payload to the endpoint, returning the response as we await. Will retry transmission + * @param type type of payload to endpoint interaction ( GET, POST, PUT, DELETE ). + * @param end endpoint is the connecting endpoint on the server + * @param payload is the data to be sent + * @param size size of the payload to be sent + * @return CoAPMessage that contains the response code and data, if any. + */ + CoAPMessage sendPayload(uint8_t type, const std::string endpoint, unsigned char *payload, size_t size); + + protected: + + /** + * Determines if the pointer is present in the internal map. + */ + bool hasResponse(coap_context_t *ctx) { + std::lock_guard<std::mutex> lock(connector_mutex_); + return messages_.find(ctx) != messages_.end(); + } + + void enqueueResponse(coap_context_t *ctx, CoAPMessage &&msg) { + std::lock_guard<std::mutex> lock(connector_mutex_); + messages_.insert(std::make_pair(ctx, std::move(msg))); + } + + void initializeProperties(); + + static void receiveError(void *receiver_context, coap_context_t *ctx, unsigned int code) { + CoapConnectorService *connector = static_cast<CoapConnectorService*>(receiver_context); + CoAPMessage message(code, 0x00, 0); + connector->enqueueResponse(ctx, std::move(message)); + } + + static void receiveMessage(void *receiver_context, coap_context_t *ctx, unsigned int code, unsigned char *dp, size_t *len) { + CoapConnectorService *connector = static_cast<CoapConnectorService*>(receiver_context); + CoAPMessage message(code, dp, *len); + connector->enqueueResponse(ctx, std::move(message)); + } + + // map of messages based on the context. We only allow a single message per context + // at any given time. + std::unordered_map<coap_context_t*, CoAPMessage> messages_; + + // connector mutex to controll access to the mapping, above. + std::mutex connector_mutex_; + + // initialization mutex. + std::mutex initialization_mutex_; + + std::atomic<bool> initialized_; --- End diff -- Do we plan to have multiple instances of COAPConnectorSerivce?
---