Github user jdye64 commented on a diff in the pull request:

    https://github.com/apache/nifi-minifi-cpp/pull/63#discussion_r105259428
  
    --- Diff: libminifi/include/Connection.h ---
    @@ -28,174 +28,168 @@
     #include <atomic>
     #include <algorithm>
     
    -#include "FlowFileRecord.h"
    -#include "Logger.h"
    -#include "Relationship.h"
    +#include "core/Connectable.h"
    +#include "core/Record.h"
    +#include "core/logging/Logger.h"
    +#include "core/Relationship.h"
    +#include "core/Connectable.h"
     
    -//! Forwarder declaration
    -class Processor;
     
    +namespace org {
    +namespace apache {
    +namespace nifi {
    +namespace minifi {
     //! Connection Class
    -class Connection
    -{
    -public:
    -   //! Constructor
    -   /*!
    -    * Create a new processor
    -    */
    -   explicit Connection(std::string name, uuid_t uuid = NULL, uuid_t 
srcUUID = NULL, uuid_t destUUID = NULL);
    -   //! Destructor
    -   virtual ~Connection() {}
    -   //! Set Connection Name
    -   void setName(std::string name) {
    -           _name = name;
    -   }
    -   //! Get Process Name
    -   std::string getName(void) {
    -           return (_name);
    -   }
    -   //! Set UUID
    -   void setUUID(uuid_t uuid) {
    -           uuid_copy(_uuid, uuid);
    -   }
    -   //! Set Source Processor UUID
    -   void setSourceProcessorUUID(uuid_t uuid) {
    -           uuid_copy(_srcUUID, uuid);
    -   }
    -   //! Set Destination Processor UUID
    -   void setDestinationProcessorUUID(uuid_t uuid) {
    -           uuid_copy(_destUUID, uuid);
    -   }
    -   //! Get Source Processor UUID
    -   void getSourceProcessorUUID(uuid_t uuid) {
    -           uuid_copy(uuid, _srcUUID);
    -   }
    -   //! Get Destination Processor UUID
    -   void getDestinationProcessorUUID(uuid_t uuid) {
    -           uuid_copy(uuid, _destUUID);
    -   }
    -   //! Get UUID
    -   bool getUUID(uuid_t uuid) {
    -           if (uuid)
    -           {
    -                   uuid_copy(uuid, _uuid);
    -                   return true;
    -           }
    -           else
    -                   return false;
    -   }
    -   //! Set Connection Source Processor
    -   void setSourceProcessor(Processor *source) {
    -           _srcProcessor = source;
    -   }
    -   // ! Get Connection Source Processor
    -   Processor *getSourceProcessor() {
    -           return _srcProcessor;
    -   }
    -   //! Set Connection Destination Processor
    -   void setDestinationProcessor(Processor *dest) {
    -           _destProcessor = dest;
    -   }
    -   // ! Get Connection Destination Processor
    -   Processor *getDestinationProcessor() {
    -           return _destProcessor;
    -   }
    -   //! Set Connection relationship
    -   void setRelationship(Relationship relationship) {
    -           _relationship = relationship;
    -   }
    -   // ! Get Connection relationship
    -   Relationship getRelationship() {
    -           return _relationship;
    -   }
    -   //! Set Max Queue Size
    -   void setMaxQueueSize(uint64_t size)
    -   {
    -           _maxQueueSize = size;
    -   }
    -   //! Get Max Queue Size
    -   uint64_t getMaxQueueSize()
    -   {
    -           return _maxQueueSize;
    -   }
    -   //! Set Max Queue Data Size
    -   void setMaxQueueDataSize(uint64_t size)
    -   {
    -           _maxQueueDataSize = size;
    -   }
    -   //! Get Max Queue Data Size
    -   uint64_t getMaxQueueDataSize()
    -   {
    -           return _maxQueueDataSize;
    -   }
    -   //! Set Flow expiration duration in millisecond
    -   void setFlowExpirationDuration(uint64_t duration)
    -   {
    -           _expiredDuration = duration;
    -   }
    -   //! Get Flow expiration duration in millisecond
    -   uint64_t getFlowExpirationDuration()
    -   {
    -           return _expiredDuration;
    -   }
    -   //! Check whether the queue is empty
    -   bool isEmpty();
    -   //! Check whether the queue is full to apply back pressure
    -   bool isFull();
    -   //! Get queue size
    -   uint64_t getQueueSize() {
    -           std::lock_guard<std::mutex> lock(_mtx);
    -           return _queue.size();
    -   }
    -   //! Get queue data size
    -   uint64_t getQueueDataSize()
    -   {
    -           return _maxQueueDataSize;
    -   }
    -   //! Put the flow file into queue
    -   void put(FlowFileRecord *flow);
    -   //! Poll the flow file from queue, the expired flow file record also 
being returned
    -   FlowFileRecord *poll(std::set<FlowFileRecord *> &expiredFlowRecords);
    -   //! Drain the flow records
    -   void drain();
    +class Connection : public core::Connectable,
    +    public std::enable_shared_from_this<Connection> {
    + public:
    +  //! Constructor
    +  /*!
    +   * Create a new processor
    +   */
    +  explicit Connection(std::string name, uuid_t uuid = NULL, uuid_t srcUUID 
=
    +  NULL,
    +                      uuid_t destUUID = NULL);
    +  //! Destructor
    +  virtual ~Connection() {
    +  }
     
    -protected:
    -   //! A global unique identifier
    -   uuid_t _uuid;
    -   //! Source Processor UUID
    -   uuid_t _srcUUID;
    -   //! Destination Processor UUID
    -   uuid_t _destUUID;
    -   //! Connection Name
    -   std::string _name;
    -   //! Relationship for this connection
    -   Relationship _relationship;
    -   //! Source Processor (ProcessNode/Port)
    -   Processor *_srcProcessor;
    -   //! Destination Processor (ProcessNode/Port)
    -   Processor *_destProcessor;
    -   //! Max queue size to apply back pressure
    -   std::atomic<uint64_t> _maxQueueSize;
    -   //! Max queue data size to apply back pressure
    -   std::atomic<uint64_t> _maxQueueDataSize;
    -   //! Flow File Expiration Duration in= MilliSeconds
    -   std::atomic<uint64_t> _expiredDuration;
    +  //! Set Source Processor UUID
    +  void setSourceUUID(uuid_t uuid) {
    +    uuid_copy(_srcUUID, uuid);
    +  }
    +  //! Set Destination Processor UUID
    +  void setDestinationUUID(uuid_t uuid) {
    +    uuid_copy(_destUUID, uuid);
    +  }
    +  //! Get Source Processor UUID
    +  void getSourceUUID(uuid_t uuid) {
    +    uuid_copy(uuid, _srcUUID);
    +  }
    +  //! Get Destination Processor UUID
    +  void getDestinationUUID(uuid_t uuid) {
    +    uuid_copy(uuid, _destUUID);
    +  }
     
    +  //! Set Connection Source Processor
    +  void setSource(
    +      std::shared_ptr<core::Connectable> source) {
    +    _srcProcessor = source;
    +  }
    +  // ! Get Connection Source Processor
    +  std::shared_ptr<core::Connectable> getSource() {
    +    return _srcProcessor;
    +  }
    +  //! Set Connection Destination Processor
    +  void setDestination(
    +      std::shared_ptr<core::Connectable> dest) {
    +    _destProcessor = dest;
    +  }
    +  // ! Get Connection Destination Processor
    +  std::shared_ptr<core::Connectable> getDestination() {
    +    return _destProcessor;
    +  }
    +  //! Set Connection relationship
    +  void setRelationship(
    +      core::Relationship relationship) {
    +    relationship_ = relationship;
    +  }
    +  // ! Get Connection relationship
    +  core::Relationship getRelationship() {
    +    return relationship_;
    +  }
    +  //! Set Max Queue Size
    +  void setMaxQueueSize(uint64_t size) {
    +    _maxQueueSize = size;
    +  }
    +  //! Get Max Queue Size
    +  uint64_t getMaxQueueSize() {
    +    return _maxQueueSize;
    +  }
    +  //! Set Max Queue Data Size
    +  void setMaxQueueDataSize(uint64_t size) {
    +    _maxQueueDataSize = size;
    +  }
    +  //! Get Max Queue Data Size
    +  uint64_t getMaxQueueDataSize() {
    +    return _maxQueueDataSize;
    +  }
    +  //! Set Flow expiration duration in millisecond
    +  void setFlowExpirationDuration(uint64_t duration) {
    +    _expiredDuration = duration;
    +  }
    +  //! Get Flow expiration duration in millisecond
    +  uint64_t getFlowExpirationDuration() {
    +    return _expiredDuration;
    +  }
    +  //! Check whether the queue is empty
    +  bool isEmpty();
    +  //! Check whether the queue is full to apply back pressure
    +  bool isFull();
    +  //! Get queue size
    +  uint64_t getQueueSize() {
    +    std::lock_guard<std::mutex> lock(_mtx);
    +    return queue_.size();
    +  }
    +  //! Get queue data size
    +  uint64_t getQueueDataSize() {
    +    return _maxQueueDataSize;
    +  }
    +  //! Put the flow file into queue
    +  void put(std::shared_ptr<core::Record> flow);
    +  //! Poll the flow file from queue, the expired flow file record also 
being returned
    +  std::shared_ptr<core::Record> poll(
    +      std::set<std::shared_ptr<core::Record>> &expiredFlowRecords);
    +  //! Drain the flow records
    +  void drain();
     
    -private:
    -   //! Mutex for protection
    -   std::mutex _mtx;
    -   //! Queued data size
    -   std::atomic<uint64_t> _queuedDataSize;
    -   //! Queue for the Flow File
    -   std::queue<FlowFileRecord *> _queue;
    -   //! Logger
    -   std::shared_ptr<Logger> logger_;
    -   // Prevent default copy constructor and assignment operation
    -   // Only support pass by reference or pointer
    -   Connection(const Connection &parent);
    -   Connection &operator=(const Connection &parent);
    +  void yield() {
     
    -};
    +  }
    +
    +  bool isWorkAvailable() {
    +    return !isEmpty();
    +  }
    +
    +  bool isRunning() {
    +    return true;
    +  }
     
    + protected:
    +  //! Source Processor UUID
    +  uuid_t _srcUUID;
    +  //! Destination Processor UUID
    +  uuid_t _destUUID;
    +  //! Relationship for this connection
    +  core::Relationship relationship_;
    +  //! Source Processor (ProcessNode/Port)
    +  std::shared_ptr<core::Connectable> _srcProcessor;
    +  //! Destination Processor (ProcessNode/Port)
    +  std::shared_ptr<core::Connectable> _destProcessor;
    +  //! Max queue size to apply back pressure
    +  std::atomic<uint64_t> _maxQueueSize;
    +  //! Max queue data size to apply back pressure
    +  std::atomic<uint64_t> _maxQueueDataSize;
    +  //! Flow File Expiration Duration in= MilliSeconds
    +  std::atomic<uint64_t> _expiredDuration;
    --- End diff --
    
    Can you move "_" to end of class data member to adhere to Google C++ Style 
guid? Same for all variables here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to