http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/ListenSyslog.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/ListenSyslog.h b/libminifi/include/processors/ListenSyslog.h index f9dc678..69223c4 100644 --- a/libminifi/include/processors/ListenSyslog.h +++ b/libminifi/include/processors/ListenSyslog.h @@ -21,14 +21,17 @@ #define __LISTEN_SYSLOG_H__ #include <stdio.h> -#include <unistd.h> #include <sys/types.h> +#ifndef WIN32 #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> -#include <errno.h> #include <sys/select.h> #include <sys/time.h> +#else +#include <WinSock2.h> +#endif +#include <errno.h> #include <sys/types.h> #include <chrono> #include <thread> @@ -39,12 +42,15 @@ #include "core/Resource.h" #include "core/logging/LoggerConfiguration.h" +#ifndef WIN32 + namespace org { namespace apache { namespace nifi { namespace minifi { namespace processors { + // SyslogEvent typedef struct { char *payload; @@ -58,7 +64,7 @@ class ListenSyslog : public core::Processor { /*! * Create a new processor */ - ListenSyslog(std::string name, uuid_t uuid = NULL) + ListenSyslog(std::string name, utils::Identifier uuid = utils::Identifier()) : Processor(name, uuid), logger_(logging::LoggerFactory<ListenSyslog>::getLogger()) { _eventQueueByteSize = 0; @@ -216,3 +222,5 @@ REGISTER_RESOURCE(ListenSyslog); } /* namespace org */ #endif + +#endif
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/LogAttribute.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/LogAttribute.h b/libminifi/include/processors/LogAttribute.h index b9e333f..412e068 100644 --- a/libminifi/include/processors/LogAttribute.h +++ b/libminifi/include/processors/LogAttribute.h @@ -40,7 +40,7 @@ class LogAttribute : public core::Processor { /*! * Create a new processor */ - LogAttribute(std::string name, uuid_t uuid = NULL) + LogAttribute(std::string name, utils::Identifier uuid = utils::Identifier()) : Processor(name, uuid), logger_(logging::LoggerFactory<LogAttribute>::getLogger()) { } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/PutFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/PutFile.h b/libminifi/include/processors/PutFile.h index ddcdac1..ba410d5 100644 --- a/libminifi/include/processors/PutFile.h +++ b/libminifi/include/processors/PutFile.h @@ -46,7 +46,7 @@ class PutFile : public core::Processor { /*! * Create a new processor */ - PutFile(std::string name, uuid_t uuid = NULL) + PutFile(std::string name, utils::Identifier uuid = utils::Identifier()) : core::Processor(name, uuid), logger_(logging::LoggerFactory<PutFile>::getLogger()) { } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/RouteOnAttribute.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/RouteOnAttribute.h b/libminifi/include/processors/RouteOnAttribute.h index 1c1eb45..0737f8f 100644 --- a/libminifi/include/processors/RouteOnAttribute.h +++ b/libminifi/include/processors/RouteOnAttribute.h @@ -36,7 +36,7 @@ namespace processors { class RouteOnAttribute : public core::Processor { public: - RouteOnAttribute(std::string name, uuid_t uuid = NULL) + RouteOnAttribute(std::string name, utils::Identifier uuid = utils::Identifier()) : core::Processor(name, uuid), logger_(logging::LoggerFactory<RouteOnAttribute>::getLogger()) { } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/TailFile.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/TailFile.h b/libminifi/include/processors/TailFile.h index fd59cb5..a98e990 100644 --- a/libminifi/include/processors/TailFile.h +++ b/libminifi/include/processors/TailFile.h @@ -40,7 +40,7 @@ class TailFile : public core::Processor { /*! * Create a new processor */ - explicit TailFile(std::string name, uuid_t uuid = NULL) + explicit TailFile(std::string name, utils::Identifier uuid = utils::Identifier()) : core::Processor(name, uuid), logger_(logging::LoggerFactory<TailFile>::getLogger()) { _stateRecovered = false; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/processors/UpdateAttribute.h ---------------------------------------------------------------------- diff --git a/libminifi/include/processors/UpdateAttribute.h b/libminifi/include/processors/UpdateAttribute.h index 117c78a..a768a87 100644 --- a/libminifi/include/processors/UpdateAttribute.h +++ b/libminifi/include/processors/UpdateAttribute.h @@ -36,7 +36,7 @@ namespace processors { class UpdateAttribute : public core::Processor { public: - UpdateAttribute(std::string name, uuid_t uuid = NULL) + UpdateAttribute(std::string name, utils::Identifier uuid = utils::Identifier()) : core::Processor(name, uuid), logger_(logging::LoggerFactory<UpdateAttribute>::getLogger()) { } @@ -64,7 +64,7 @@ class UpdateAttribute : public core::Processor { private: std::shared_ptr<logging::Logger> logger_; - std::vector<std::string> attributes_; + std::vector<core::Property> attributes_; }; REGISTER_RESOURCE(UpdateAttribute); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/provenance/Provenance.h ---------------------------------------------------------------------- diff --git a/libminifi/include/provenance/Provenance.h b/libminifi/include/provenance/Provenance.h index 72fd379..5ec33fa 100644 --- a/libminifi/include/provenance/Provenance.h +++ b/libminifi/include/provenance/Provenance.h @@ -18,7 +18,6 @@ #ifndef __PROVENANCE_H__ #define __PROVENANCE_H__ -#include <ftw.h> #include <uuid/uuid.h> #include <atomic> #include <cstdint> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/sitetosite/Peer.h ---------------------------------------------------------------------- diff --git a/libminifi/include/sitetosite/Peer.h b/libminifi/include/sitetosite/Peer.h index 3775c7c..2c9e2a0 100644 --- a/libminifi/include/sitetosite/Peer.h +++ b/libminifi/include/sitetosite/Peer.h @@ -19,9 +19,6 @@ #define LIBMINIFI_INCLUDE_SITETOSITE_PEER_H_ #include <stdio.h> -#include <fcntl.h> -#include <resolv.h> -#include <netdb.h> #include <string> #include <errno.h> #include <uuid/uuid.h> @@ -36,6 +33,7 @@ #include "properties/Configure.h" #include "io/ClientSocket.h" #include "io/BaseStream.h" +#include "io/DataStream.h" #include "utils/TimeUtil.h" #include "utils/HTTPClient.h" @@ -47,11 +45,11 @@ namespace sitetosite { class Peer { public: - explicit Peer(uuid_t port_id, const std::string &host, uint16_t port, bool secure = false) + explicit Peer(utils::Identifier &port_id, const std::string &host, uint16_t port, bool secure = false) : host_(host), port_(port), secure_(secure) { - uuid_copy(port_id_, port_id); + port_id_ = port_id; } explicit Peer(const std::string &host, uint16_t port, bool secure = false) @@ -64,14 +62,14 @@ class Peer { : host_(other.host_), port_(other.port_), secure_(other.secure_) { - uuid_copy(port_id_, other.port_id_); + port_id_ = other.port_id_; } - explicit Peer(const Peer &&other) + explicit Peer(Peer &&other) : host_(std::move(other.host_)), port_(std::move(other.port_)), secure_(std::move(other.secure_)) { - uuid_copy(port_id_, other.port_id_); + port_id_ = other.port_id_; } uint16_t getPort() const { @@ -86,8 +84,8 @@ class Peer { return secure_; } - void getPortId(uuid_t other) const { - uuid_copy(other, port_id_); + void getPortId(utils::Identifier &other) const { + other = port_id_; } protected: @@ -95,7 +93,7 @@ class Peer { uint16_t port_; - uuid_t port_id_; + utils::Identifier port_id_; // secore comms @@ -149,12 +147,12 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { /* * Create a new site2site peer */ - explicit SiteToSitePeer(std::unique_ptr<org::apache::nifi::minifi::io::DataStream> injected_socket, const std::string host, uint16_t port, const std::string &interface) - : SiteToSitePeer(host, port, interface) { + explicit SiteToSitePeer(std::unique_ptr<org::apache::nifi::minifi::io::DataStream> injected_socket, const std::string host, uint16_t port, const std::string &ifc) + : SiteToSitePeer(host, port, ifc) { stream_ = std::move(injected_socket); } - explicit SiteToSitePeer(const std::string &host, uint16_t port, const std::string &interface) + explicit SiteToSitePeer(const std::string &host, uint16_t port, const std::string &ifc) : stream_(nullptr), host_(host), port_(port), @@ -164,7 +162,7 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { url_ = "nifi://" + host_ + ":" + std::to_string(port_); yield_expiration_ = 0; timeout_ = 30000; // 30 seconds - local_network_interface_= std::move(io::NetworkInterface(interface, nullptr)); + local_network_interface_= std::move(io::NetworkInterface(ifc, nullptr)); } explicit SiteToSitePeer(SiteToSitePeer &&ss) @@ -191,8 +189,8 @@ class SiteToSitePeer : public org::apache::nifi::minifi::io::BaseStream { return url_; } // setInterface - void setInterface(std::string &interface) { - local_network_interface_ = std::move(io::NetworkInterface(interface,nullptr)); + void setInterface(std::string &ifc) { + local_network_interface_ = std::move(io::NetworkInterface(ifc,nullptr)); } std::string getInterface() { return local_network_interface_.getInterface(); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/sitetosite/RawSocketProtocol.h ---------------------------------------------------------------------- diff --git a/libminifi/include/sitetosite/RawSocketProtocol.h b/libminifi/include/sitetosite/RawSocketProtocol.h index 7a075bf..0c7feb5 100644 --- a/libminifi/include/sitetosite/RawSocketProtocol.h +++ b/libminifi/include/sitetosite/RawSocketProtocol.h @@ -21,13 +21,7 @@ #define __SITE2SITE_CLIENT_PROTOCOL_H__ #include <stdio.h> -#include <unistd.h> #include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <fcntl.h> -#include <netdb.h> #include <string> #include <errno.h> #include <chrono> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/sitetosite/SiteToSite.h ---------------------------------------------------------------------- diff --git a/libminifi/include/sitetosite/SiteToSite.h b/libminifi/include/sitetosite/SiteToSite.h index 484a277..2cc0823 100644 --- a/libminifi/include/sitetosite/SiteToSite.h +++ b/libminifi/include/sitetosite/SiteToSite.h @@ -245,8 +245,7 @@ class Transaction { // Generate the global UUID for the transaction id_generator_->generate(uuid_); - uuid_unparse_lower(uuid_, uuidStr); - uuid_str_ = uuidStr; + uuid_str_ = uuid_.to_string(); } // Destructor virtual ~Transaction() { @@ -262,7 +261,7 @@ class Transaction { void setUUIDStr(const std::string &str) { uuid_str_ = str; - uuid_parse(str.c_str(), uuid_); + uuid_ = str; } // getState @@ -323,7 +322,7 @@ class Transaction { TransferDirection _direction; // A global unique identifier - uuid_t uuid_; + utils::Identifier uuid_; // UUID string std::string uuid_str_; @@ -332,10 +331,10 @@ class Transaction { class SiteToSiteClientConfiguration { public: - SiteToSiteClientConfiguration(std::shared_ptr<io::StreamFactory> stream_factory, const std::shared_ptr<Peer> &peer, const std::string &interface, CLIENT_TYPE type = RAW) + SiteToSiteClientConfiguration(std::shared_ptr<io::StreamFactory> stream_factory, const std::shared_ptr<Peer> &peer, const std::string &ifc, CLIENT_TYPE type = RAW) : stream_factory_(stream_factory), peer_(peer), - local_network_interface_(interface), + local_network_interface_(ifc), ssl_service_(nullptr) { client_type_ = type; } @@ -363,8 +362,8 @@ class SiteToSiteClientConfiguration { } // setInterface - void setInterface(std::string &interface) { - local_network_interface_ = interface; + void setInterface(std::string &ifc) { + local_network_interface_ = ifc; } std::string getInterface() const { return local_network_interface_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/sitetosite/SiteToSiteClient.h ---------------------------------------------------------------------- diff --git a/libminifi/include/sitetosite/SiteToSiteClient.h b/libminifi/include/sitetosite/SiteToSiteClient.h index 259b95e..6469e82 100644 --- a/libminifi/include/sitetosite/SiteToSiteClient.h +++ b/libminifi/include/sitetosite/SiteToSiteClient.h @@ -56,7 +56,7 @@ class SiteToSiteClient : public core::Connectable { public: SiteToSiteClient() - : core::Connectable("SitetoSiteClient", 0), + : core::Connectable("SitetoSiteClient"), peer_state_(IDLE), _batchSendNanos(5000000000), ssl_context_service_(nullptr), @@ -96,11 +96,20 @@ class SiteToSiteClient : public core::Connectable { * @returns true if the process succeeded, failure OR exception thrown otherwise */ virtual bool transfer(TransferDirection direction, const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { - if (__builtin_expect(direction == SEND, 1)) { +#ifndef WIN32 + if (__builtin_expect(direction == SEND, 1)) { return transferFlowFiles(context, session); } else { return receiveFlowFiles(context, session); } +#else + if (direction == SEND) { + return transferFlowFiles(context, session); + } + else { + return receiveFlowFiles(context, session); + } +#endif } /** @@ -136,11 +145,9 @@ class SiteToSiteClient : public core::Connectable { virtual bool transmitPayload(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session, const std::string &payload, std::map<std::string, std::string> attributes) = 0; - void setPortId(uuid_t id) { - uuid_copy(port_id_, id); - char idStr[37]; - uuid_unparse_lower(id, idStr); - port_id_str_ = idStr; + void setPortId(utils::Identifier &id) { + port_id_ = id; + port_id_str_ = port_id_.to_string(); } /** @@ -241,7 +248,7 @@ class SiteToSiteClient : public core::Connectable { std::string port_id_str_; // portId - uuid_t port_id_; + utils::Identifier port_id_; // Peer Connection std::unique_ptr<SiteToSitePeer> peer_; @@ -285,7 +292,7 @@ class WriteCallback : public OutputStreamCallback { int len = _packet->_size; int total = 0; while (len > 0) { - int size = std::min(len, 16384); + int size = len < 16384 ? len : 16384; int ret = _packet->transaction_->getStream().readData(buffer, size); if (ret != size) { logging::LOG_ERROR(_packet->logger_reference_) << "Site2Site Receive Flow Size " << size << " Failed " << ret << ", should have received " << len; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/sitetosite/SiteToSiteFactory.h ---------------------------------------------------------------------- diff --git a/libminifi/include/sitetosite/SiteToSiteFactory.h b/libminifi/include/sitetosite/SiteToSiteFactory.h index 4f0f36c..2d1dd74 100644 --- a/libminifi/include/sitetosite/SiteToSiteFactory.h +++ b/libminifi/include/sitetosite/SiteToSiteFactory.h @@ -58,7 +58,7 @@ static std::unique_ptr<SiteToSitePeer> createStreamingPeer(const SiteToSiteClien * RawSiteToSiteClient will be instantiated and returned through a unique ptr. */ static std::unique_ptr<SiteToSiteClient> createRawSocket(const SiteToSiteClientConfiguration &client_configuration) { - uuid_t uuid; + utils::Identifier uuid; client_configuration.getPeer()->getPortId(uuid); auto rsptr = createStreamingPeer(client_configuration); if (nullptr == rsptr){ @@ -77,7 +77,7 @@ static std::unique_ptr<SiteToSiteClient> createRawSocket(const SiteToSiteClientC * @returns site to site client or nullptr. */ static std::unique_ptr<SiteToSiteClient> createClient(const SiteToSiteClientConfiguration &client_configuration) { - uuid_t uuid; + utils::Identifier uuid; client_configuration.getPeer()->getPortId(uuid); switch (client_configuration.getClientType()) { case RAW: @@ -90,8 +90,7 @@ static std::unique_ptr<SiteToSiteClient> createClient(const SiteToSiteClientConf auto peer = std::unique_ptr<SiteToSitePeer>(new SiteToSitePeer(client_configuration.getPeer()->getHost(), client_configuration.getPeer()->getPort(), client_configuration.getInterface())); peer->setHTTPProxy(client_configuration.getHTTPProxy()); - char idStr[37]; - uuid_unparse_lower(uuid, idStr); + ptr->setPortId(uuid); ptr->setPeer(std::move(peer)); return ptr; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/HTTPClient.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/HTTPClient.h b/libminifi/include/utils/HTTPClient.h index 553dcea..87bf659 100644 --- a/libminifi/include/utils/HTTPClient.h +++ b/libminifi/include/utils/HTTPClient.h @@ -198,7 +198,7 @@ class HTTPRequestResponse { }; class BaseHTTPClient { - public: +public: explicit BaseHTTPClient(const std::string &url, const std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service = nullptr) { response_code = -1; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/Id.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/Id.h b/libminifi/include/utils/Id.h index d9f0811..46537be 100644 --- a/libminifi/include/utils/Id.h +++ b/libminifi/include/utils/Id.h @@ -17,10 +17,11 @@ #ifndef LIBMINIFI_INCLUDE_UTILS_ID_H_ #define LIBMINIFI_INCLUDE_UTILS_ID_H_ +#include <cstddef> #include <atomic> #include <memory> #include <string> -#include <uuid/uuid.h> +#include "uuid/uuid.h" #include "core/logging/Logger.h" #include "properties/Properties.h" @@ -37,9 +38,91 @@ namespace nifi { namespace minifi { namespace utils { +template<typename T, typename C> +class IdentifierBase { + public: + + IdentifierBase(T myid) { + copyInto(myid); + } + + IdentifierBase(const IdentifierBase &other) { + copyInto(other.id_); + } + + IdentifierBase() { + } + + IdentifierBase &operator=(const IdentifierBase &other) { + copyInto(other.id_); + return *this; + } + + IdentifierBase &operator=(T o) { + copyInto(o); + return *this; + } + + void getIdentifier(T other) const { + copyOutOf(other); + } + + C convert() const { + return converted_; + } + + protected: + + void copyInto(const IdentifierBase &other){ + memcpy(id_, other.id_, sizeof(T)); + } + + void copyInto(const void *other) { + memcpy(id_, other, sizeof(T)); + } + + void copyOutOf(void *other) { + memcpy(other, id_, sizeof(T)); + } + + C converted_; + + T id_; +}; + +class Identifier : public IdentifierBase<UUID_FIELD, std::string> { + public: + Identifier(UUID_FIELD u); + Identifier(); + Identifier(const Identifier &other); + Identifier(const IdentifierBase &other); + + Identifier &operator=(const IdentifierBase &other); + Identifier &operator=(const Identifier &other); + Identifier &operator=(UUID_FIELD o); + + Identifier &operator=(std::string id); + bool operator==(std::nullptr_t nullp); + + bool operator!=(std::nullptr_t nullp); + + bool operator!=(const Identifier &other); + bool operator==(const Identifier &other); + + std::string to_string(); + + unsigned char *toArray(); + + protected: + + void build_string(); + +}; + class IdGenerator { public: - void generate(uuid_t output); + void generate(Identifier &output); + Identifier generate(); void initialize(const std::shared_ptr<Properties> & properties); static std::shared_ptr<IdGenerator> getIdGenerator() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/StringUtils.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h index 7f33260..24adae1 100644 --- a/libminifi/include/utils/StringUtils.h +++ b/libminifi/include/utils/StringUtils.h @@ -17,6 +17,11 @@ #ifndef LIBMINIFI_INCLUDE_IO_STRINGUTILS_H_ #define LIBMINIFI_INCLUDE_IO_STRINGUTILS_H_ #include <iostream> +#include <functional> +#ifdef WIN32 + #include <cwctype> + #include <cctype> +#endif #include <algorithm> #include <sstream> #include <vector> @@ -68,7 +73,7 @@ class StringUtils { * @returns modified string */ static inline std::string trimLeft(std::string s) { - s.erase(s.begin(), std::find_if(s.begin(), s.end(), std::not1(std::pointer_to_unary_function<int, int>(std::isspace)))); + s.erase(s.begin(), std::find_if(s.begin(), s.end(), std::not1(std::pointer_to_unary_function<int, int>(isspace)))); return s; } @@ -79,7 +84,7 @@ class StringUtils { */ static inline std::string trimRight(std::string s) { - s.erase(std::find_if(s.rbegin(), s.rend(), std::not1(std::pointer_to_unary_function<int, int>(std::isspace))).base(), s.end()); + s.erase(std::find_if(s.rbegin(), s.rend(), std::not1(std::pointer_to_unary_function<int, int>(isspace))).base(), s.end()); return s; } @@ -88,7 +93,7 @@ class StringUtils { */ static inline bool equalsIgnoreCase(const std::string &left, const std::string right) { if (left.length() == right.length()) { - return std::equal(right.begin(), right.end(), left.begin(), [](unsigned char lc, unsigned char rc) {return std::tolower(lc) == std::tolower(rc);}); + return std::equal(right.begin(), right.end(), left.begin(), [](unsigned char lc, unsigned char rc) {return tolower(lc) == tolower(rc);}); } else { return false; } @@ -200,7 +205,7 @@ class StringUtils { inline static bool endsWithIgnoreCase(const std::string &value, const std::string & endString) { if (endString.size() > value.size()) return false; - return std::equal(endString.rbegin(), endString.rend(), value.rbegin(), [](unsigned char lc, unsigned char rc) {return std::tolower(lc) == std::tolower(rc);}); + return std::equal(endString.rbegin(), endString.rend(), value.rbegin(), [](unsigned char lc, unsigned char rc) {return tolower(lc) == tolower(rc);}); } inline static bool endsWith(const std::string &value, const std::string & endString) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/TimeUtil.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/TimeUtil.h b/libminifi/include/utils/TimeUtil.h index 19c2566..0dc3b5c 100644 --- a/libminifi/include/utils/TimeUtil.h +++ b/libminifi/include/utils/TimeUtil.h @@ -18,9 +18,6 @@ #define __TIME_UTIL_H__ #include <time.h> -#include <sys/time.h> -#include <string.h> -#include <unistd.h> #include <string.h> #include <iomanip> #include <sstream> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/file/DiffUtils.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/file/DiffUtils.h b/libminifi/include/utils/file/DiffUtils.h index d44360d..313e0f6 100644 --- a/libminifi/include/utils/file/DiffUtils.h +++ b/libminifi/include/utils/file/DiffUtils.h @@ -24,10 +24,8 @@ #else #include <cstdlib> #include <sys/stat.h> -#include <dirent.h> #endif #include <cstdio> -#include <unistd.h> #include <fcntl.h> #ifdef WIN32 #define stat _stat http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/file/FileManager.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/file/FileManager.h b/libminifi/include/utils/file/FileManager.h index f6a4312..247f249 100644 --- a/libminifi/include/utils/file/FileManager.h +++ b/libminifi/include/utils/file/FileManager.h @@ -23,7 +23,6 @@ #include <cstdlib> #endif #include <cstdio> -#include <unistd.h> #include <fcntl.h> #include "io/validation.h" #include "utils/Id.h" http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/include/utils/file/FileUtils.h ---------------------------------------------------------------------- diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h index 33ca437..dd307ee 100644 --- a/libminifi/include/utils/file/FileUtils.h +++ b/libminifi/include/utils/file/FileUtils.h @@ -17,6 +17,9 @@ #ifndef LIBMINIFI_INCLUDE_UTILS_FILEUTILS_H_ #define LIBMINIFI_INCLUDE_UTILS_FILEUTILS_H_ + + + #include <sstream> #include <fstream> #ifdef BOOST_VERSION @@ -24,14 +27,30 @@ #else #include <cstring> #include <cstdlib> +#ifdef WIN32 +#define WIN32_LEAN_AND_MEAN +#include <WinSock2.h> +#include <WS2tcpip.h> +#include <Windows.h> +#pragma comment(lib, "Ws2_32.lib") +#else #include <sys/stat.h> #include <dirent.h> #endif +#endif #include <cstdio> +#ifndef WIN32 #include <unistd.h> +#endif #include <fcntl.h> #ifdef WIN32 #define stat _stat +#include <direct.h> +#include <windows.h> // winapi +#include <sys/stat.h> // stat +#include <tchar.h> // _tcscpy,_tcscat,_tcscmp +#include <string> // string +#include <algorithm> // replace #endif namespace org { @@ -72,6 +91,50 @@ class FileUtils { //display error message } return 0; +#elif defined(WIN32) + WIN32_FIND_DATA FindFileData; + HANDLE hFind; + DWORD Attributes; + std::string str; + + + std::stringstream pathstr; + pathstr << path << "\\.*"; + str = pathstr.str(); + + + //List files + hFind = FindFirstFile(str.c_str(), &FindFileData); + if (hFind != INVALID_HANDLE_VALUE) + { + do { + if (strcmp(FindFileData.cFileName, ".") != 0 && strcmp(FindFileData.cFileName, "..") != 0) + { + //Str append Example + + std::stringstream strs; + strs << path << "\\" << FindFileData.cFileName; + str = strs.str(); + + //_tprintf (TEXT("File Found: %s\n"),str); + Attributes = GetFileAttributes(str.c_str()); + if (Attributes & FILE_ATTRIBUTE_DIRECTORY) + { + //is directory + delete_dir(str, delete_files_recursively); + } + else + { + remove(str.c_str()); + //not directory + } + } + } while (FindNextFile(hFind, &FindFileData)); + FindClose(hFind); + + RemoveDirectory(path.c_str()); + } + return 0; #else DIR *current_directory = opendir(path.c_str()); int r = -1; @@ -136,11 +199,15 @@ class FileUtils { #else struct stat dir_stat; if (stat(path.c_str(), &dir_stat)) { - mkdir(path.c_str(), 0700); +#ifdef WIN32 + _mkdir(path.c_str()); +#else + mkdir(path.c_str(), 0700); +#endif + return 0; } - return 0; + return -1; #endif - return -1; } static int copy_file(const std::string &path_from, const std::string dest_path) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/opsys/posix/io/ClientSocket.h ---------------------------------------------------------------------- diff --git a/libminifi/opsys/posix/io/ClientSocket.h b/libminifi/opsys/posix/io/ClientSocket.h new file mode 100644 index 0000000..4ed8b99 --- /dev/null +++ b/libminifi/opsys/posix/io/ClientSocket.h @@ -0,0 +1,296 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_IO_POSIX_CLIENTSOCKET_H_ +#define LIBMINIFI_INCLUDE_IO_POSIX_CLIENTSOCKET_H_ + + +#include <cstdint> +#include <sys/types.h> +#include <sys/socket.h> +#include <netdb.h> +#include <unistd.h> +#include <mutex> +#include <atomic> +#include "io/BaseStream.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "io/validation.h" +#include "properties/Configure.h" +#include "io/NetworkPrioritizer.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +/** + * Context class for socket. This is currently only used as a parent class for TLSContext. It is necessary so the Socket and TLSSocket constructors + * can be the same. It also gives us a common place to set timeouts, etc from the Configure object in the future. + */ +class SocketContext { + public: + SocketContext(const std::shared_ptr<Configure> &configure) { + } +}; +/** + * Socket class. + * Purpose: Provides a general purpose socket interface that abstracts + * connecting information from users + * Design: Extends DataStream and allows us to perform most streaming + * operations against a BSD socket + * + * + */ +class Socket : public BaseStream { + public: + /** + * Constructor that creates a client socket. + * @param context the SocketContext + * @param hostname hostname we are connecting to. + * @param port port we are connecting to. + */ + explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port); + + /** + * Move constructor. + */ + explicit Socket(const Socket &&); + + /** + * Static function to return the current machine's host name + */ + static std::string getMyHostName() { + static std::string HOSTNAME = init_hostname(); + return HOSTNAME; + } + + /** + * Destructor + */ + + virtual ~Socket(); + + virtual void closeStream(); + /** + * Initializes the socket + * @return result of the creation operation. + */ + virtual int16_t initialize(); + + virtual void setInterface(io::NetworkInterface &&interface) { + local_network_interface_ = std::move(interface); + } + + /** + * Sets the non blocking flag on the file descriptor. + */ + void setNonBlocking(); + + std::string getHostname() const; + + /** + * Return the port for this socket + * @returns port + */ + uint16_t getPort(); + + // data stream extensions + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + * @param retrieve_all_bytes determines if we should read all bytes before returning + */ + virtual int readData(std::vector<uint8_t> &buf, int buflen) { + return readData(buf, buflen, true); + } + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + * @param retrieve_all_bytes determines if we should read all bytes before returning + */ + virtual int readData(uint8_t *buf, int buflen) { + return readData(buf, buflen, true); + } + + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + * @param retrieve_all_bytes determines if we should read all bytes before returning + */ + virtual int readData(std::vector<uint8_t> &buf, int buflen, bool retrieve_all_bytes); + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + * @param retrieve_all_bytes determines if we should read all bytes before returning + */ + virtual int readData(uint8_t *buf, int buflen, bool retrieve_all_bytes); + + /** + * Write value to the stream using std::vector + * @param buf incoming buffer + * @param buflen buffer to write + * + */ + virtual int writeData(std::vector<uint8_t> &buf, int buflen); + + /** + * writes value to stream + * @param value value to write + * @param size size of value + */ + virtual int writeData(uint8_t *value, int size); + + /** + * Writes a system word + * @param value value to write + */ + virtual int write(uint64_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE); + + /** + * Writes a uint32_t + * @param value value to write + */ + virtual int write(uint32_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE); + + /** + * Writes a system short + * @param value value to write + */ + virtual int write(uint16_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE); + + /** + * Reads a system word + * @param value value to write + */ + virtual int read(uint64_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE); + + /** + * Reads a uint32_t + * @param value value to write + */ + virtual int read(uint32_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE); + + /** + * Reads a system short + * @param value value to write + */ + virtual int read(uint16_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE); + + /** + * Returns the underlying buffer + * @return vector's array + **/ + const uint8_t *getBuffer() const { + return DataStream::getBuffer(); + } + + /** + * Retrieve size of data stream + * @return size of data stream + **/ + const uint64_t getSize() const { + return DataStream::getSize(); + } + + protected: + + /** + * Constructor that accepts host name, port and listeners. With this + * contructor we will be creating a server socket + * @param context the SocketContext + * @param hostname our host name + * @param port connecting port + * @param listeners number of listeners in the queue + */ + explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners); + + /** + * Creates a vector and returns the vector using the provided + * type name. + * @param t incoming object + * @returns vector. + */ + template<typename T> + std::vector<uint8_t> readBuffer(const T&); + + /** + * Creates a connection using the address info object. + * @param p addrinfo structure. + * @returns fd. + */ + virtual int8_t createConnection(const addrinfo *p, in_addr_t &addr); + + /** + * Sets socket options depending on the instance. + * @param sock socket file descriptor. + */ + virtual int16_t setSocketOptions(const int sock); + + /** + * Attempt to select the socket file descriptor + * @param msec timeout interval to wait + * @returns file descriptor + */ + virtual int16_t select_descriptor(const uint16_t msec); + + addrinfo *addr_info_; + + std::recursive_mutex selection_mutex_; + + std::string requested_hostname_; + std::string canonical_hostname_; + uint16_t port_; + + bool is_loopback_only_; + io::NetworkInterface local_network_interface_; + + // connection information + int32_t socket_file_descriptor_; + + fd_set total_list_; + fd_set read_fds_; + std::atomic<uint16_t> socket_max_; + std::atomic<uint64_t> total_written_; + std::atomic<uint64_t> total_read_; + uint16_t listeners_; + + + bool nonBlocking_; + private: + std::shared_ptr<logging::Logger> logger_; + static std::string init_hostname() { + char hostname[1024]; + gethostname(hostname, 1024); + Socket mySock(nullptr, hostname, 0); + mySock.initialize(); + auto resolved_hostname = mySock.getHostname(); + return !IsNullOrEmpty(resolved_hostname) ? resolved_hostname : hostname; + } +}; + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif /* LIBMINIFI_INCLUDE_IO_POSIX_CLIENTSOCKET_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/opsys/win/io/ClientSocket.h ---------------------------------------------------------------------- diff --git a/libminifi/opsys/win/io/ClientSocket.h b/libminifi/opsys/win/io/ClientSocket.h new file mode 100644 index 0000000..3621215 --- /dev/null +++ b/libminifi/opsys/win/io/ClientSocket.h @@ -0,0 +1,343 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef LIBMINIFI_INCLUDE_IO_WIN__CLIENTSOCKET_H_ +#define LIBMINIFI_INCLUDE_IO_WIN__CLIENTSOCKET_H_ + +#include <cstdint> +#include <sys/types.h> +#ifdef WIN32 +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN +#endif +//#include <WinSock.h> +//#include <WS2tcpip.h> +#include <Windows.h> +#include <WS2tcpip.h> +#pragma comment(lib, "Ws2_32.lib") +#else +#include <sys/types.h> +#include <sys/socket.h> +#include <netdb.h> +#include <unistd.h> +#endif +#include <mutex> +#include <atomic> +#include "io/BaseStream.h" +#include "core/Core.h" +#include "core/logging/Logger.h" +#include "io/validation.h" +#include "properties/Configure.h" +#include "io/NetworkPrioritizer.h" + + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +#ifdef WIN32 + typedef SOCKET SocketDescriptor; +#else + typedef int SocketDescriptor; + #define INVALID_SOCKET -1 +#endif + + +/** + * Context class for socket. This is currently only used as a parent class for TLSContext. It is necessary so the Socket and TLSSocket constructors + * can be the same. It also gives us a common place to set timeouts, etc from the Configure object in the future. + */ +class SocketContext { + public: + SocketContext(const std::shared_ptr<Configure> &configure) { + } +}; +/** + * Socket class. + * Purpose: Provides a general purpose socket interface that abstracts + * connecting information from users + * Design: Extends DataStream and allows us to perform most streaming + * operations against a BSD socket + * + * + */ +class Socket : public BaseStream { +public: + /** + * Constructor that creates a client socket. + * @param context the SocketContext + * @param hostname hostname we are connecting to. + * @param port port we are connecting to. + */ + explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port); + + /** + * Move constructor. + */ + explicit Socket(const Socket &&); + + /** + * Static function to return the current machine's host name + */ + static std::string getMyHostName() { + static std::string HOSTNAME = init_hostname(); + return HOSTNAME; + } + + /** + * Destructor + */ + + virtual ~Socket(); + + virtual void closeStream(); + /** + * Initializes the socket + * @return result of the creation operation. + */ + virtual int16_t initialize(); + + virtual void setInterface(io::NetworkInterface &&ifc) { + local_network_interface_ = std::move(ifc); + } + + /** + * Sets the non blocking flag on the file descriptor. + */ + void setNonBlocking(); + + std::string getHostname() const; + + /** + * Return the port for this socket + * @returns port + */ + uint16_t getPort(); + + // data stream extensions + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + * @param retrieve_all_bytes determines if we should read all bytes before returning + */ + virtual int readData(std::vector<uint8_t> &buf, int buflen) { + return readData(buf, buflen, true); + } + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + * @param retrieve_all_bytes determines if we should read all bytes before returning + */ + virtual int readData(uint8_t *buf, int buflen) { + return readData(buf, buflen, true); + } + + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + * @param retrieve_all_bytes determines if we should read all bytes before returning + */ + virtual int readData(std::vector<uint8_t> &buf, int buflen, bool retrieve_all_bytes); + /** + * Reads data and places it into buf + * @param buf buffer in which we extract data + * @param buflen + * @param retrieve_all_bytes determines if we should read all bytes before returning + */ + virtual int readData(uint8_t *buf, int buflen, bool retrieve_all_bytes); + + /** + * Write value to the stream using std::vector + * @param buf incoming buffer + * @param buflen buffer to write + * + */ + virtual int writeData(std::vector<uint8_t> &buf, int buflen); + + /** + * writes value to stream + * @param value value to write + * @param size size of value + */ + virtual int writeData(uint8_t *value, int size); + + /** + * Writes a system word + * @param value value to write + */ + virtual int write(uint64_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE); + + /** + * Writes a uint32_t + * @param value value to write + */ + virtual int write(uint32_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE); + + /** + * Writes a system short + * @param value value to write + */ + virtual int write(uint16_t value, bool is_little_endian = EndiannessCheck::IS_LITTLE); + + /** + * Reads a system word + * @param value value to write + */ + virtual int read(uint64_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE); + + /** + * Reads a uint32_t + * @param value value to write + */ + virtual int read(uint32_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE); + + /** + * Reads a system short + * @param value value to write + */ + virtual int read(uint16_t &value, bool is_little_endian = EndiannessCheck::IS_LITTLE); + + /** + * Returns the underlying buffer + * @return vector's array + **/ + const uint8_t *getBuffer() const { + return DataStream::getBuffer(); + } + + /** + * Retrieve size of data stream + * @return size of data stream + **/ + const uint64_t getSize() const { + return DataStream::getSize(); + } + +protected: + + /** + * Constructor that accepts host name, port and listeners. With this + * contructor we will be creating a server socket + * @param context the SocketContext + * @param hostname our host name + * @param port connecting port + * @param listeners number of listeners in the queue + */ + explicit Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners); + + /** + * Creates a vector and returns the vector using the provided + * type name. + * @param t incoming object + * @returns vector. + */ + template<typename T> + std::vector<uint8_t> readBuffer(const T&); + + /** + * Creates a connection using the address info object. + * @param p addrinfo structure. + * @returns fd. + */ +#ifdef WIN32 + virtual int8_t createConnection(const addrinfo *p, struct in_addr &addr); +#else + virtual int8_t createConnection(const addrinfo *p, in_addr_t &addr); +#endif + + /** + * Sets socket options depending on the instance. + * @param sock socket file descriptor. + */ + virtual int16_t setSocketOptions(const SocketDescriptor sock); + + /** + * Attempt to select the socket file descriptor + * @param msec timeout interval to wait + * @returns file descriptor + */ + virtual int16_t select_descriptor(const uint16_t msec); + + + addrinfo *addr_info_; + + std::recursive_mutex selection_mutex_; + + std::string requested_hostname_; + std::string canonical_hostname_; + uint16_t port_; + + bool is_loopback_only_; + io::NetworkInterface local_network_interface_; + + // connection information + SocketDescriptor socket_file_descriptor_; + + fd_set total_list_; + fd_set read_fds_; + std::atomic<uint16_t> socket_max_; + std::atomic<uint64_t> total_written_; + std::atomic<uint64_t> total_read_; + uint16_t listeners_; + + bool nonBlocking_; +private: + + class SocketInitializer + { + public: + SocketInitializer() { + #ifdef WIN32 + static WSADATA s_wsaData; + int iWinSockInitResult = WSAStartup(MAKEWORD(2, 2), &s_wsaData); + #endif + } + ~SocketInitializer() { + #ifdef WIN32 + WSACleanup(); + #endif + } + }; + static void initialize_socket() { + static SocketInitializer initialized; + } + + + std::shared_ptr<logging::Logger> logger_; + + + static std::string init_hostname() { + char hostname[1024]; + gethostname(hostname, 1024); + Socket mySock(nullptr, hostname, 0); + mySock.initialize(); + auto resolved_hostname = mySock.getHostname(); + return !IsNullOrEmpty(resolved_hostname) ? resolved_hostname : hostname; + } +}; + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ +#endif /* LIBMINIFI_INCLUDE_IO_WIN__CLIENTSOCKET_H_ */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/Connection.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp index 9513f68..f9a8fcb 100644 --- a/libminifi/src/Connection.cpp +++ b/libminifi/src/Connection.cpp @@ -18,7 +18,6 @@ * limitations under the License. */ #include "Connection.h" -#include <sys/time.h> #include <time.h> #include <vector> #include <queue> @@ -39,17 +38,64 @@ namespace apache { namespace nifi { namespace minifi { -Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, uuid_t uuid, uuid_t srcUUID, - uuid_t destUUID) +Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name) + : core::Connectable(name), + flow_repository_(flow_repository), + content_repo_(content_repo), + logger_(logging::LoggerFactory<Connection>::getLogger()) { + source_connectable_ = nullptr; + dest_connectable_ = nullptr; + max_queue_size_ = 0; + max_data_queue_size_ = 0; + expired_duration_ = 0; + queued_data_size_ = 0; + + logger_->log_debug("Connection %s created", name_); +} + +Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, utils::Identifier & uuid) + : core::Connectable(name, uuid), + flow_repository_(flow_repository), + content_repo_(content_repo), + logger_(logging::LoggerFactory<Connection>::getLogger()) { + source_connectable_ = nullptr; + dest_connectable_ = nullptr; + max_queue_size_ = 0; + max_data_queue_size_ = 0; + expired_duration_ = 0; + queued_data_size_ = 0; + + logger_->log_debug("Connection %s created", name_); +} + +Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, utils::Identifier & uuid, + utils::Identifier & srcUUID) + : core::Connectable(name, uuid), + flow_repository_(flow_repository), + content_repo_(content_repo), + logger_(logging::LoggerFactory<Connection>::getLogger()) { + + src_uuid_ = srcUUID; + + source_connectable_ = nullptr; + dest_connectable_ = nullptr; + max_queue_size_ = 0; + max_data_queue_size_ = 0; + expired_duration_ = 0; + queued_data_size_ = 0; + + logger_->log_debug("Connection %s created", name_); +} + +Connection::Connection(const std::shared_ptr<core::Repository> &flow_repository, const std::shared_ptr<core::ContentRepository> &content_repo, std::string name, utils::Identifier & uuid, + utils::Identifier & srcUUID, utils::Identifier & destUUID) : core::Connectable(name, uuid), flow_repository_(flow_repository), content_repo_(content_repo), logger_(logging::LoggerFactory<Connection>::getLogger()) { - if (srcUUID) - uuid_copy(src_uuid_, srcUUID); - if (destUUID) - uuid_copy(dest_uuid_, destUUID); + src_uuid_ = srcUUID; + dest_uuid_ = destUUID; source_connectable_ = nullptr; dest_connectable_ = nullptr; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/FlowControlProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowControlProtocol.cpp b/libminifi/src/FlowControlProtocol.cpp index 60122b0..60f701a 100644 --- a/libminifi/src/FlowControlProtocol.cpp +++ b/libminifi/src/FlowControlProtocol.cpp @@ -18,10 +18,8 @@ * limitations under the License. */ #include "FlowControlProtocol.h" -#include <sys/time.h> #include <stdio.h> #include <time.h> -#include <netinet/tcp.h> #include <chrono> #include <thread> #include <string> @@ -35,96 +33,12 @@ namespace nifi { namespace minifi { int FlowControlProtocol::connectServer(const char *host, uint16_t port) { - in_addr_t addr; - int sock = 0; - struct hostent *h; -#ifdef __MACH__ - h = gethostbyname(host); -#else - char buf[1024]; - struct hostent he; - int hh_errno; - gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); -#endif - memcpy(reinterpret_cast<char*>(&addr), h->h_addr_list[0], h->h_length); - sock = socket(AF_INET, SOCK_STREAM, 0); - if (sock < 0) { - logger_->log_error("Could not create socket to hostName %s", host); - return 0; - } - -#ifndef __MACH__ - int opt = 1; - bool nagle_off = true; - - if (nagle_off) { - if (setsockopt(sock, SOL_TCP, TCP_NODELAY, reinterpret_cast<void*>(&opt), sizeof(opt)) < 0) { - logger_->log_error("setsockopt() TCP_NODELAY failed"); - close(sock); - return 0; - } - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) { - logger_->log_error("setsockopt() SO_REUSEADDR failed"); - close(sock); - return 0; - } - } - - int sndsize = 256 * 1024; - if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char*>(&sndsize), sizeof(sndsize)) < 0) { - logger_->log_error("setsockopt() SO_SNDBUF failed"); - close(sock); - return 0; - } -#endif - - struct sockaddr_in sa; - socklen_t socklen; - int status; - - memset(&sa, 0, sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_addr.s_addr = htonl(INADDR_ANY); - sa.sin_port = htons(0); - socklen = sizeof(sa); - if (bind(sock, (struct sockaddr *) &sa, socklen) < 0) { - logger_->log_error("socket bind failed"); - close(sock); - return 0; - } - - memset(&sa, 0, sizeof(sa)); - sa.sin_family = AF_INET; - sa.sin_addr.s_addr = addr; - sa.sin_port = htons(port); - socklen = sizeof(sa); - - status = connect(sock, (struct sockaddr *) &sa, socklen); - - if (status < 0) { - logger_->log_error("socket connect failed to %s %ll", host, port); - close(sock); - return 0; - } - - logger_->log_debug("Flow Control Protocol socket %ll connect to server %s port %ll success", sock, host, port); - - return sock; + in_addr addr; + return 0; } int FlowControlProtocol::sendData(uint8_t *buf, int buflen) { - int ret = 0, bytes = 0; - - while (bytes < buflen) { - ret = send(_socket, buf + bytes, buflen - bytes, 0); - // check for errors - if (ret == -1) { - return ret; - } - bytes += ret; - } - - return bytes; + return 0; } int FlowControlProtocol::selectClient(int msec) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 12ce99f..c840f14 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -18,11 +18,9 @@ * limitations under the License. */ #include "FlowController.h" -#include <sys/time.h> #include <time.h> #include <sys/types.h> #include <sys/stat.h> -#include <unistd.h> #include <vector> #include <queue> #include <map> @@ -56,6 +54,13 @@ #include "core/Connectable.h" #include "utils/HTTPClient.h" + +#ifdef _MSC_VER +#ifndef PATH_MAX +#define PATH_MAX 260 +#endif +#endif + namespace org { namespace apache { namespace nifi { @@ -63,7 +68,7 @@ namespace minifi { std::shared_ptr<utils::IdGenerator> FlowController::id_generator_ = utils::IdGenerator::getIdGenerator(); -#define DEFAULT_CONFIG_NAME "conf/flow.yml" +#define DEFAULT_CONFIG_NAME "conf/config.yml" FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, std::unique_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<core::ContentRepository> content_repo, const std::string name, bool headless_mode) @@ -133,8 +138,12 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo void FlowController::initializePaths(const std::string &adjustedFilename) { char *path = NULL; +#ifndef WIN32 char full_path[PATH_MAX]; path = realpath(adjustedFilename.c_str(), full_path); +#else + path = const_cast<char*>(adjustedFilename.c_str()); +#endif if (path == NULL) { throw std::runtime_error("Path is not specified. Either manually set MINIFI_HOME or ensure ../conf exists"); @@ -144,7 +153,7 @@ void FlowController::initializePaths(const std::string &adjustedFilename) { logger_->log_info("FlowController NiFi Configuration file %s", pathString); if (!path) { - logger_->log_error("Could not locate path from provided configuration file name (%s). Exiting.", full_path); + logger_->log_error("Could not locate path from provided configuration file name (%s). Exiting.", path); exit(1); } } @@ -540,7 +549,7 @@ void FlowController::loadC2ResponseConfiguration(const std::string &prefix) { std::string name; if (configuration_->get(nameOption.str(), name)) { - std::shared_ptr<state::response::ResponseNode> new_node = std::make_shared<state::response::ObjectNode>(name, nullptr); + std::shared_ptr<state::response::ResponseNode> new_node = std::make_shared<state::response::ObjectNode>(name); if (configuration_->get(classOption.str(), class_definitions)) { std::vector<std::string> classes = utils::StringUtils::split(class_definitions, ","); @@ -601,7 +610,7 @@ std::shared_ptr<state::response::ResponseNode> FlowController::loadC2ResponseCon std::string name; if (configuration_->get(nameOption.str(), name)) { - std::shared_ptr<state::response::ResponseNode> new_node = std::make_shared<state::response::ObjectNode>(name, nullptr); + std::shared_ptr<state::response::ResponseNode> new_node = std::make_shared<state::response::ObjectNode>(name); if (name.find(",") != std::string::npos) { std::vector<std::string> sub_classes = utils::StringUtils::split(name, ","); for (std::string subClassStr : classes) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/FlowFileRecord.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/FlowFileRecord.cpp b/libminifi/src/FlowFileRecord.cpp index 7815e70..8868c9a 100644 --- a/libminifi/src/FlowFileRecord.cpp +++ b/libminifi/src/FlowFileRecord.cpp @@ -18,7 +18,6 @@ * limitations under the License. */ #include "FlowFileRecord.h" -#include <sys/time.h> #include <time.h> #include <cstdio> #include <vector> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/Properties.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/Properties.cpp b/libminifi/src/Properties.cpp index cb3b752..f5537fc 100644 --- a/libminifi/src/Properties.cpp +++ b/libminifi/src/Properties.cpp @@ -104,8 +104,12 @@ void Properties::loadConfigureFile(const char *fileName) { } } char *path = NULL; +#ifndef WIN32 char full_path[PATH_MAX]; path = realpath(adjustedFilename.c_str(), full_path); +#else + path = const_cast<char*>(adjustedFilename.c_str()); +#endif logger_->log_info("Using configuration file located at %s", path); std::ifstream file(path, std::ifstream::in); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/RemoteProcessorGroupPort.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/RemoteProcessorGroupPort.cpp b/libminifi/src/RemoteProcessorGroupPort.cpp index e52a2c2..8bed630 100644 --- a/libminifi/src/RemoteProcessorGroupPort.cpp +++ b/libminifi/src/RemoteProcessorGroupPort.cpp @@ -20,8 +20,6 @@ #include "RemoteProcessorGroupPort.h" -#include <curl/curl.h> -#include <curl/easy.h> #include <uuid/uuid.h> #include <algorithm> #include <cstdint> @@ -133,8 +131,8 @@ void RemoteProcessorGroupPort::initialize() { void RemoteProcessorGroupPort::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) { std::string value; - if (context->getProperty(portUUID.getName(), value)) { - uuid_parse(value.c_str(), protocol_uuid_); + if (context->getProperty(portUUID.getName(), value) && !value.empty()) { + protocol_uuid_ = value; } std::string context_name; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/c2/C2Agent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index 2514906..1ef8eea 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -17,7 +17,6 @@ */ #include "c2/C2Agent.h" -#include <unistd.h> #include <csignal> #include <utility> #include <vector> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/c2/protocols/RESTProtocol.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp index 391ed81..d1e8b9a 100644 --- a/libminifi/src/c2/protocols/RESTProtocol.cpp +++ b/libminifi/src/c2/protocols/RESTProtocol.cpp @@ -33,6 +33,7 @@ namespace minifi { namespace c2 { const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const std::vector<char> &response) { +#ifndef WIN32 rapidjson::Document root; try { @@ -126,6 +127,7 @@ const C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, const } } catch (...) { } +#endif return std::move(C2Payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true)); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/capi/C2CallbackAgent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/C2CallbackAgent.cpp b/libminifi/src/capi/C2CallbackAgent.cpp index d8da5d6..dd04466 100644 --- a/libminifi/src/capi/C2CallbackAgent.cpp +++ b/libminifi/src/capi/C2CallbackAgent.cpp @@ -17,7 +17,6 @@ */ #include "capi/C2CallbackAgent.h" -#include <unistd.h> #include <csignal> #include <utility> #include <vector> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/capi/Plan.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/capi/Plan.cpp b/libminifi/src/capi/Plan.cpp index 6181382..595a01a 100644 --- a/libminifi/src/capi/Plan.cpp +++ b/libminifi/src/capi/Plan.cpp @@ -22,6 +22,8 @@ #include <set> #include <string> +std::shared_ptr<utils::IdGenerator> ExecutionPlan::id_generator_ = utils::IdGenerator::getIdGenerator(); + ExecutionPlan::ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo) : content_repo_(content_repo), flow_repo_(flow_repo), @@ -38,8 +40,8 @@ std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_p return nullptr; } - uuid_t uuid; - uuid_generate(uuid); + utils::Identifier uuid; + id_generator_->generate(uuid); processor->setStreamFactory(stream_factory); // initialize the processor @@ -66,7 +68,7 @@ std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::shared_p connection->setSource(last); connection->setDestination(processor); - uuid_t uuid_copy, uuid_copy_next; + utils::Identifier uuid_copy, uuid_copy_next; last->getUUID(uuid_copy); connection->setSourceUUID(uuid_copy); processor->getUUID(uuid_copy_next); @@ -95,8 +97,8 @@ std::shared_ptr<core::Processor> ExecutionPlan::addProcessor(const std::string & return nullptr; } - uuid_t uuid; - uuid_generate(uuid); + utils::Identifier uuid; + id_generator_->generate(uuid); auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(processor_name, uuid); if (nullptr == ptr) { @@ -186,7 +188,7 @@ std::shared_ptr<minifi::Connection> ExecutionPlan::buildFinalConnection(std::sha if (setDest) connection->setDestination(processor); - uuid_t uuid_copy; + utils::Identifier uuid_copy; last->getUUID(uuid_copy); connection->setSourceUUID(uuid_copy); if (setDest) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/controllers/LinuxPowerManagementService.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/controllers/LinuxPowerManagementService.cpp b/libminifi/src/controllers/LinuxPowerManagementService.cpp index 836c9d3..a54cb7b 100644 --- a/libminifi/src/controllers/LinuxPowerManagementService.cpp +++ b/libminifi/src/controllers/LinuxPowerManagementService.cpp @@ -40,7 +40,7 @@ bool LinuxPowerManagerService::isAboveMax(int new_tasks) { } uint16_t LinuxPowerManagerService::getMaxThreads() { - return std::numeric_limits<uint16_t>::max(); + return (std::numeric_limits<uint16_t>::max)(); } bool LinuxPowerManagerService::canIncrease() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/controllers/NetworkPrioritizerService.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/controllers/NetworkPrioritizerService.cpp b/libminifi/src/controllers/NetworkPrioritizerService.cpp index 63210ec..d92758a 100644 --- a/libminifi/src/controllers/NetworkPrioritizerService.cpp +++ b/libminifi/src/controllers/NetworkPrioritizerService.cpp @@ -21,15 +21,18 @@ #include <limits> #include <string> #include <vector> -#include <sys/ioctl.h> +#ifndef WIN32 #include <ifaddrs.h> #include <net/if.h> +#include <sys/ioctl.h> #include <netinet/in.h> -#include <string.h> #include <sys/socket.h> #include <netdb.h> -#include <stdlib.h> #include <unistd.h> +#endif +#include <string.h> +#include <stdlib.h> + #include <set> #include "utils/StringUtils.h" #if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD)) @@ -112,6 +115,7 @@ std::string NetworkPrioritizerService::get_nearest_interface(const std::vector<s } bool NetworkPrioritizerService::interface_online(const std::string &ifc) { +#ifndef WIN32 struct ifreq ifr; auto sockid = socket(PF_INET6, SOCK_DGRAM, IPPROTO_IP); memset(&ifr, 0, sizeof(ifr)); @@ -121,6 +125,9 @@ bool NetworkPrioritizerService::interface_online(const std::string &ifc) { } close(sockid); return (ifr.ifr_flags & IFF_UP) && (ifr.ifr_flags & IFF_RUNNING); +#else + return false; +#endif } std::vector<std::string> NetworkPrioritizerService::getInterfaces(uint32_t size = 0) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/controllers/SSLContextService.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/controllers/SSLContextService.cpp b/libminifi/src/controllers/SSLContextService.cpp index 352ba37..8d0a997 100644 --- a/libminifi/src/controllers/SSLContextService.cpp +++ b/libminifi/src/controllers/SSLContextService.cpp @@ -17,8 +17,11 @@ */ #include "controllers/SSLContextService.h" + +#ifdef OPENSSL_SUPPORT #include <openssl/err.h> #include <openssl/ssl.h> +#endif #include <string> #include <memory> #include <set> @@ -44,7 +47,13 @@ void SSLContextService::initialize() { initialized_ = true; } +/** + * If OpenSSL is not installed we may still continue operations. Nullptr will + * be returned and it will be up to the caller to determien if this failure is + * recoverable. + */ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() { +#ifdef OPENSSL_SUPPORT SSL_library_init(); const SSL_METHOD *method; @@ -82,6 +91,9 @@ std::unique_ptr<SSLContext> SSLContextService::createSSLContext() { logger_->log_error("Can not load CA certificate %s, Exiting, error : %s", ca_certificate_, std::strerror(errno)); } return std::unique_ptr<SSLContext>(new SSLContext(ctx)); +#else + return nullptr; +#endif } const std::string &SSLContextService::getCertificateFile() { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/controllers/UpdatePolicyControllerService.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/controllers/UpdatePolicyControllerService.cpp b/libminifi/src/controllers/UpdatePolicyControllerService.cpp index 08faad2..ae56fe1 100644 --- a/libminifi/src/controllers/UpdatePolicyControllerService.cpp +++ b/libminifi/src/controllers/UpdatePolicyControllerService.cpp @@ -21,15 +21,8 @@ #include <limits> #include <string> #include <vector> -#include <sys/ioctl.h> -#include <ifaddrs.h> -#include <net/if.h> -#include <netinet/in.h> #include <string.h> -#include <sys/socket.h> -#include <netdb.h> #include <stdlib.h> -#include <unistd.h> #include <set> #include "utils/StringUtils.h" #if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD)) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/ClassLoader.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ClassLoader.cpp b/libminifi/src/core/ClassLoader.cpp index 5c2fdf8..9c1aa67 100644 --- a/libminifi/src/core/ClassLoader.cpp +++ b/libminifi/src/core/ClassLoader.cpp @@ -15,10 +15,8 @@ * limitations under the License. */ -#include <sys/mman.h> #include <memory> #include <string> - #include "core/ClassLoader.h" namespace org { @@ -35,6 +33,7 @@ ClassLoader &ClassLoader::getDefaultClassLoader() { // populate ret return ret; } + uint16_t ClassLoader::registerResource(const std::string &resource, const std::string &resourceFunction) { void *resource_ptr = nullptr; if (resource.empty()) { @@ -57,7 +56,7 @@ uint16_t ClassLoader::registerResource(const std::string &resource, const std::s // load the symbols createFactory* create_factory_func = reinterpret_cast<createFactory*>(dlsym(resource_ptr, resourceFunction.c_str())); const char* dlsym_error = dlerror(); - if (dlsym_error) { + if ((dlsym_error != nullptr && strlen(dlsym_error) > 0) || create_factory_func == nullptr) { return RESOURCE_FAILURE; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/ConfigurableComponent.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ConfigurableComponent.cpp b/libminifi/src/core/ConfigurableComponent.cpp index ff6d1e5..c413ab9 100644 --- a/libminifi/src/core/ConfigurableComponent.cpp +++ b/libminifi/src/core/ConfigurableComponent.cpp @@ -191,6 +191,7 @@ bool ConfigurableComponent::createDynamicProperty(const std::string &name, const } Property new_property(name, DEFAULT_DYNAMIC_PROPERTY_DESC, value, false, "", {}, {}); + new_property.setSupportsExpressionLanguage(true); logger_->log_info("Processor %s dynamic property '%s' value '%s'", name.c_str(), new_property.getName().c_str(), value.c_str()); dynamic_properties_[new_property.getName()] = new_property; onDynamicPropertyModified({}, new_property); @@ -205,6 +206,7 @@ bool ConfigurableComponent::setDynamicProperty(const std::string name, std::stri Property &orig_property = it->second; Property new_property = orig_property; new_property.setValue(value); + new_property.setSupportsExpressionLanguage(true); dynamic_properties_[new_property.getName()] = new_property; onDynamicPropertyModified(orig_property, new_property); logger_->log_debug("Component %s dynamic property name %s value %s", name, new_property.getName(), value); @@ -222,6 +224,7 @@ bool ConfigurableComponent::updateDynamicProperty(const std::string &name, const Property &orig_property = it->second; Property new_property = orig_property; new_property.addValue(value); + new_property.setSupportsExpressionLanguage(true); dynamic_properties_[new_property.getName()] = new_property; onDynamicPropertyModified(orig_property, new_property); logger_->log_debug("Component %s dynamic property name %s value %s", name, new_property.getName(), value); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/Connectable.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Connectable.cpp b/libminifi/src/core/Connectable.cpp index 2a98818..a5fb1b2 100644 --- a/libminifi/src/core/Connectable.cpp +++ b/libminifi/src/core/Connectable.cpp @@ -30,13 +30,21 @@ namespace nifi { namespace minifi { namespace core { -Connectable::Connectable(std::string name, uuid_t uuid) +Connectable::Connectable(std::string name, utils::Identifier &uuid) : CoreComponent(name, uuid), max_concurrent_tasks_(1), connectable_version_(nullptr), logger_(logging::LoggerFactory<Connectable>::getLogger()) { } +Connectable::Connectable(std::string name) + : CoreComponent(name), + max_concurrent_tasks_(1), + connectable_version_(nullptr), + logger_(logging::LoggerFactory<Connectable>::getLogger()) { +} + + Connectable::Connectable(const Connectable &&other) : CoreComponent(std::move(other)), max_concurrent_tasks_(std::move(other.max_concurrent_tasks_)), http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/Core.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Core.cpp b/libminifi/src/core/Core.cpp index da9462b..e5c8d6c 100644 --- a/libminifi/src/core/Core.cpp +++ b/libminifi/src/core/Core.cpp @@ -29,31 +29,29 @@ namespace core { std::shared_ptr<utils::IdGenerator> CoreComponent::id_generator_ = utils::IdGenerator::getIdGenerator(); // Set UUID -void CoreComponent::setUUID(uuid_t uuid) { - uuid_copy(uuid_, uuid); - char uuidStr[37]; - uuid_unparse_lower(uuid_, uuidStr); - uuidStr_ = uuidStr; +void CoreComponent::setUUID(utils::Identifier &uuid) { + uuid_ = uuid; + uuidStr_ = uuid_.to_string(); } void CoreComponent::setUUIDStr(const std::string uuidStr) { - uuid_parse(uuidStr.c_str(), uuid_); + uuid_ = uuidStr; uuidStr_ = uuidStr; } // Get UUID -bool CoreComponent::getUUID(uuid_t uuid) { - if (uuid) { - uuid_copy(uuid, uuid_); - return true; - } else { +bool CoreComponent::getUUID(utils::Identifier &uuid) { + if (uuid_ == nullptr) { return false; } + uuid = uuid_; + return true; } // Get UUID -unsigned const char *CoreComponent::getUUID() { - return uuid_; -} +/* + unsigned const char *CoreComponent::getUUID() { + return uuid_.getIdentifier(); + }*/ // Set Processor Name void CoreComponent::setName(const std::string name) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/FlowConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index 320797b..aea6d62 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -28,13 +28,15 @@ namespace nifi { namespace minifi { namespace core { -std::vector<std::string> FlowConfiguration::statics_sl_funcs_; -std::mutex FlowConfiguration::atomic_initialization_; +static_initializers &get_static_functions() { + static static_initializers static_sl_funcs; + return static_sl_funcs; +} FlowConfiguration::~FlowConfiguration() { } -std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string name, uuid_t uuid) { +std::shared_ptr<core::Processor> FlowConfiguration::createProcessor(std::string name, utils::Identifier & uuid) { auto ptr = core::ClassLoader::getDefaultClassLoader().instantiate(name, uuid); if (nullptr == ptr) { logger_->log_error("No Processor defined for %s", name); @@ -60,47 +62,47 @@ std::shared_ptr<core::Processor> FlowConfiguration::createProvenanceReportTask() } std::unique_ptr<core::ProcessGroup> FlowConfiguration::updateFromPayload(const std::string &source, const std::string &yamlConfigPayload) { - if (!source.empty()) { - std::string host, protocol, path, query, url = source; - int port; - utils::parse_url(&url, &host, &port, &protocol, &path, &query); - - std::string flow_id, bucket_id; - auto path_split = utils::StringUtils::split(path, "/"); - for (size_t i = 0; i < path_split.size(); i++) { - const std::string &str = path_split.at(i); - if (str == "flows") { - if (i + 1 < path_split.size()) { - flow_id = path_split.at(i + 1); - i++; - } + if (!source.empty()) { + std::string host, protocol, path, query, url = source; + int port; + utils::parse_url(&url, &host, &port, &protocol, &path, &query); + + std::string flow_id, bucket_id; + auto path_split = utils::StringUtils::split(path, "/"); + for (size_t i = 0; i < path_split.size(); i++) { + const std::string &str = path_split.at(i); + if (str == "flows") { + if (i + 1 < path_split.size()) { + flow_id = path_split.at(i + 1); + i++; } + } - if (str == "bucket") { - if (i + 1 < path_split.size()) { - bucket_id = path_split.at(i + 1); - i++; - } + if (str == "bucket") { + if (i + 1 < path_split.size()) { + bucket_id = path_split.at(i + 1); + i++; } } - flow_version_->setFlowVersion(url, bucket_id, flow_id); } - return getRootFromPayload(yamlConfigPayload); + flow_version_->setFlowVersion(url, bucket_id, flow_id); } + return getRootFromPayload(yamlConfigPayload); +} -std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(std::string name, uuid_t uuid, int version) { +std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRootProcessGroup(std::string name, utils::Identifier & uuid, int version) { return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::ROOT_PROCESS_GROUP, name, uuid, version)); } -std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(std::string name, uuid_t uuid) { +std::unique_ptr<core::ProcessGroup> FlowConfiguration::createRemoteProcessGroup(std::string name, utils::Identifier & uuid) { return std::unique_ptr<core::ProcessGroup>(new core::ProcessGroup(core::REMOTE_PROCESS_GROUP, name, uuid)); } -std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(std::string name, uuid_t uuid) { +std::shared_ptr<minifi::Connection> FlowConfiguration::createConnection(std::string name, utils::Identifier & uuid) { return std::make_shared<minifi::Connection>(flow_file_repo_, content_repo_, name, uuid); } -std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &name, uuid_t uuid) { +std::shared_ptr<core::controller::ControllerServiceNode> FlowConfiguration::createControllerService(const std::string &class_name, const std::string &name, utils::Identifier & uuid) { std::shared_ptr<core::controller::ControllerServiceNode> controllerServicesNode = service_provider_->createControllerService(class_name, name, true); if (nullptr != controllerServicesNode) controllerServicesNode->setUUID(uuid); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/FlowFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/FlowFile.cpp b/libminifi/src/core/FlowFile.cpp index 3427559..d41d9b0 100644 --- a/libminifi/src/core/FlowFile.cpp +++ b/libminifi/src/core/FlowFile.cpp @@ -33,7 +33,7 @@ std::shared_ptr<utils::IdGenerator> FlowFile::id_generator_ = utils::IdGenerator std::shared_ptr<logging::Logger> FlowFile::logger_ = logging::LoggerFactory<FlowFile>::getLogger(); FlowFile::FlowFile() - : Connectable("FlowFile", 0), + : Connectable("FlowFile"), size_(0), id_(0), stored(false), @@ -54,7 +54,7 @@ FlowFile::~FlowFile() { } FlowFile& FlowFile::operator=(const FlowFile& other) { - uuid_copy(uuid_, other.uuid_); + uuid_ = other.uuid_; stored = other.stored; marked_delete_ = other.marked_delete_; entry_date_ = other.entry_date_; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index 205f9f2..7df1b09 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -18,7 +18,6 @@ * limitations under the License. */ #include "core/ProcessGroup.h" -#include <sys/time.h> #include <time.h> #include <vector> #include <memory> @@ -39,17 +38,40 @@ namespace core { std::shared_ptr<utils::IdGenerator> ProcessGroup::id_generator_ = utils::IdGenerator::getIdGenerator(); -ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, int version, ProcessGroup *parent) +ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, utils::Identifier &uuid) + : ProcessGroup(type, name, uuid, 0, 0) { +} + +ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, utils::Identifier &uuid, int version) + : ProcessGroup(type, name, uuid, version, 0) { +} + +ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, utils::Identifier &uuid, int version, ProcessGroup *parent) : logger_(logging::LoggerFactory<ProcessGroup>::getLogger()), name_(name), type_(type), config_version_(version), parent_process_group_(parent) { - if (!uuid) - // Generate the global UUID for the flow record + if (uuid == nullptr) { id_generator_->generate(uuid_); - else - uuid_copy(uuid_, uuid); + } else { + uuid_ = uuid; + } + + yield_period_msec_ = 0; + transmitting_ = false; + transport_protocol_ = "RAW"; + + logger_->log_debug("ProcessGroup %s created", name_); +} + +ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name) + : logger_(logging::LoggerFactory<ProcessGroup>::getLogger()), + name_(name), + type_(type), + config_version_(0), + parent_process_group_(0) { + id_generator_->generate(uuid_); yield_period_msec_ = 0; transmitting_ = false; @@ -168,20 +190,15 @@ void ProcessGroup::stopProcessing(TimerDrivenSchedulingAgent *timeScheduler, Eve } } -std::shared_ptr<Processor> ProcessGroup::findProcessor(uuid_t uuid) { +std::shared_ptr<Processor> ProcessGroup::findProcessor(utils::Identifier &uuid) { std::lock_guard<std::recursive_mutex> lock(mutex_); std::shared_ptr<Processor> ret = NULL; for (auto processor : processors_) { logger_->log_debug("find processor %s", processor->getName()); - uuid_t processorUUID; + utils::Identifier processorUUID; if (processor->getUUID(processorUUID)) { - char uuid_str[37]; // ex. "1b4e28ba-2fa1-11d2-883f-0016d3cca427" + "\0" - uuid_unparse_lower(processorUUID, uuid_str); - std::string processorUUIDstr = uuid_str; - uuid_unparse_lower(uuid, uuid_str); - std::string uuidStr = uuid_str; - if (processorUUIDstr == uuidStr) { + if (uuid == processorUUID) { return processor; } } @@ -277,14 +294,14 @@ void ProcessGroup::addConnection(std::shared_ptr<Connection> connection) { // We do not have the same connection in this process group yet connections_.insert(connection); logger_->log_debug("Add connection %s into process group %s", connection->getName(), name_); - uuid_t sourceUUID; + utils::Identifier sourceUUID; std::shared_ptr<Processor> source = NULL; connection->getSourceUUID(sourceUUID); source = this->findProcessor(sourceUUID); if (source) source->addConnection(connection); std::shared_ptr<Processor> destination = NULL; - uuid_t destinationUUID; + utils::Identifier destinationUUID; connection->getDestinationUUID(destinationUUID); destination = this->findProcessor(destinationUUID); if (destination && destination != source) @@ -299,14 +316,14 @@ void ProcessGroup::removeConnection(std::shared_ptr<Connection> connection) { // We do not have the same connection in this process group yet connections_.erase(connection); logger_->log_debug("Remove connection %s into process group %s", connection->getName(), name_); - uuid_t sourceUUID; + utils::Identifier sourceUUID; std::shared_ptr<Processor> source = NULL; connection->getSourceUUID(sourceUUID); source = this->findProcessor(sourceUUID); if (source) source->removeConnection(connection); std::shared_ptr<Processor> destination = NULL; - uuid_t destinationUUID; + utils::Identifier destinationUUID; connection->getDestinationUUID(destinationUUID); destination = this->findProcessor(destinationUUID); if (destination && destination != source) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessSession.cpp b/libminifi/src/core/ProcessSession.cpp index 1bfa9f8..dc45446 100644 --- a/libminifi/src/core/ProcessSession.cpp +++ b/libminifi/src/core/ProcessSession.cpp @@ -19,7 +19,6 @@ */ #include "core/ProcessSession.h" #include "core/ProcessSessionReadCallback.h" -#include <sys/time.h> #include <time.h> #include <vector> #include <queue> @@ -31,6 +30,25 @@ #include <thread> #include <iostream> #include <uuid/uuid.h> +/* This implementation is only for native Windows systems. */ +#if (defined _WIN32 || defined __WIN32__) && !defined __CYGWIN__ +#define _WINSOCKAPI_ +#ifndef WIN32_LEAN_AND_MEAN +#define WIN32_LEAN_AND_MEAN +#endif +#include <WinSock2.h> +#include <WS2tcpip.h> +#include <Windows.h> +#pragma comment(lib, "Ws2_32.lib") +#include <direct.h> + +int getpagesize(void) { + return 4096; + // SYSTEM_INFO system_info; + // GetSystemInfo(&system_info); + // return system_info.dwPageSize; +} +#endif namespace org { namespace apache { @@ -619,12 +637,10 @@ bool ProcessSession::exportContent(const std::string &destination, const std::st } bool ProcessSession::exportContent(const std::string &destination, const std::shared_ptr<core::FlowFile> &flow, bool keepContent) { - char tmpFileUuidStr[37]; - uuid_t tmpFileUuid; + utils::Identifier tmpFileUuid; id_generator_->generate(tmpFileUuid); - uuid_unparse_lower(tmpFileUuid, tmpFileUuidStr); std::stringstream tmpFileSs; - tmpFileSs << destination << "." << tmpFileUuidStr; + tmpFileSs << destination << "." << tmpFileUuid.to_string(); std::string tmpFileName = tmpFileSs.str(); return exportContent(destination, tmpFileName, flow, keepContent); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/Processor.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index 7c0fe25..480ae58 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -18,7 +18,6 @@ * limitations under the License. */ #include "core/Processor.h" -#include <sys/time.h> #include <time.h> #include <vector> #include <queue> @@ -45,8 +44,8 @@ namespace nifi { namespace minifi { namespace core { -Processor::Processor(std::string name, uuid_t uuid) - : Connectable(name, uuid), +Processor::Processor(std::string name) + : Connectable(name), ConfigurableComponent(), logger_(logging::LoggerFactory<Processor>::getLogger()) { has_work_.store(false); @@ -66,6 +65,27 @@ Processor::Processor(std::string name, uuid_t uuid) logger_->log_debug("Processor %s created UUID %s", name_, uuidStr_); } +Processor::Processor(std::string name, utils::Identifier &uuid) + : Connectable(name, uuid), + ConfigurableComponent(), + logger_(logging::LoggerFactory<Processor>::getLogger()) { + has_work_.store(false); + // Setup the default values + state_ = DISABLED; + strategy_ = TIMER_DRIVEN; + loss_tolerant_ = false; + _triggerWhenEmpty = false; + scheduling_period_nano_ = MINIMUM_SCHEDULING_NANOS; + run_duration_nano_ = DEFAULT_RUN_DURATION; + yield_period_msec_ = DEFAULT_YIELD_PERIOD_SECONDS * 1000; + _penalizationPeriodMsec = DEFAULT_PENALIZATION_PERIOD_SECONDS * 1000; + max_concurrent_tasks_ = DEFAULT_MAX_CONCURRENT_TASKS; + active_tasks_ = 0; + yield_expiration_ = 0; + incoming_connections_Iter = this->_incomingConnections.begin(); + logger_->log_debug("Processor %s created UUID %s with uuid %s", name_, uuidStr_, uuid.to_string()); +} + bool Processor::isRunning() { return (state_ == RUNNING && active_tasks_ > 0); } @@ -87,17 +107,13 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) { std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); std::lock_guard<std::mutex> lock(mutex_); - uuid_t srcUUID; - uuid_t destUUID; + utils::Identifier srcUUID; + utils::Identifier destUUID; connection->getSourceUUID(srcUUID); connection->getDestinationUUID(destUUID); - char uuid_str[37]; - - uuid_unparse_lower(uuid_, uuid_str); - std::string my_uuid = uuid_str; - uuid_unparse_lower(destUUID, uuid_str); - std::string destination_uuid = uuid_str; + std::string my_uuid = uuid_.to_string(); + std::string destination_uuid = destUUID.to_string(); if (my_uuid == destination_uuid) { // Connection is destination to the current processor if (_incomingConnections.find(connection) == _incomingConnections.end()) { @@ -108,8 +124,7 @@ bool Processor::addConnection(std::shared_ptr<Connectable> conn) { ret = true; } } - uuid_unparse_lower(srcUUID, uuid_str); - std::string source_uuid = uuid_str; + std::string source_uuid = srcUUID.to_string(); if (my_uuid == source_uuid) { std::string relationship = connection->getRelationship().getName(); // Connection is source from the current processor @@ -147,15 +162,15 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) { std::lock_guard<std::mutex> lock(mutex_); - uuid_t srcUUID; - uuid_t destUUID; + utils::Identifier srcUUID; + utils::Identifier destUUID; std::shared_ptr<Connection> connection = std::static_pointer_cast<Connection>(conn); connection->getSourceUUID(srcUUID); connection->getDestinationUUID(destUUID); - if (uuid_compare(uuid_, destUUID) == 0) { + if (uuid_ == destUUID) { // Connection is destination to the current processor if (_incomingConnections.find(connection) != _incomingConnections.end()) { _incomingConnections.erase(connection); @@ -165,7 +180,7 @@ void Processor::removeConnection(std::shared_ptr<Connectable> conn) { } } - if (uuid_compare(uuid_, srcUUID) == 0) { + if (uuid_ == srcUUID) { std::string relationship = connection->getRelationship().getName(); // Connection is source from the current processor auto &&it = out_going_connections_.find(relationship);