http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/ProcessorNode.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/ProcessorNode.cpp b/libminifi/src/core/ProcessorNode.cpp index 0a21f4d..81f8c9b 100644 --- a/libminifi/src/core/ProcessorNode.cpp +++ b/libminifi/src/core/ProcessorNode.cpp @@ -26,17 +26,17 @@ namespace core { ProcessorNode::ProcessorNode(const std::shared_ptr<Connectable> &processor) : processor_(processor), - Connectable(processor->getName(), 0), + Connectable(processor->getName()), ConfigurableComponent() { - uuid_t copy; + utils::Identifier copy; processor->getUUID(copy); setUUID(copy); } ProcessorNode::ProcessorNode(const ProcessorNode &other) : processor_(other.processor_), - Connectable(other.getName(), 0) { - uuid_t copy; + Connectable(other.getName()) { + utils::Identifier copy; processor_->getUUID(copy); setUUID(copy); }
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/Property.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Property.cpp b/libminifi/src/core/Property.cpp index bb84e44..f3205d6 100644 --- a/libminifi/src/core/Property.cpp +++ b/libminifi/src/core/Property.cpp @@ -50,6 +50,10 @@ bool Property::getRequired() const { return is_required_; } +bool Property::supportsExpressionLangauge() const { + return supports_el_; +} + std::string Property::getValidRegex() const { return valid_regex_; } @@ -67,6 +71,10 @@ void Property::setValue(std::string value) { } } +void Property::setSupportsExpressionLanguage(bool supportEl) { + supports_el_ = supportEl; +} + void Property::addValue(std::string value) { values_.push_back(std::move(value)); } @@ -86,6 +94,7 @@ const Property &Property::operator=(const Property &other) { valid_regex_ = other.valid_regex_; dependent_properties_ = other.dependent_properties_; exclusive_of_properties_ = other.exclusive_of_properties_; + supports_el_ = other.supports_el_; return *this; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/Repository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/Repository.cpp b/libminifi/src/core/Repository.cpp index 1432d9e..8c5fa43 100644 --- a/libminifi/src/core/Repository.cpp +++ b/libminifi/src/core/Repository.cpp @@ -16,7 +16,6 @@ * limitations under the License. */ #include "core/Repository.h" -#include <arpa/inet.h> #include <cstdint> #include <vector> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/logging/LoggerConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp index a0a207a..ef5be53 100644 --- a/libminifi/src/core/logging/LoggerConfiguration.cpp +++ b/libminifi/src/core/logging/LoggerConfiguration.cpp @@ -20,7 +20,6 @@ #include "core/logging/LoggerConfiguration.h" #include <sys/stat.h> -#include <unistd.h> #include <algorithm> #include <vector> #include <queue> @@ -34,6 +33,13 @@ #include "spdlog/spdlog.h" #include "spdlog/sinks/stdout_sinks.h" #include "spdlog/sinks/null_sink.h" +#ifdef WIN32 +#define stat _stat +#include <direct.h> +#define _WINSOCKAPI_ +#include <windows.h> +#include <tchar.h> +#endif namespace org { namespace apache { @@ -120,10 +126,17 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::initialize_names // Create the log directory if needed directory += "/logs"; struct stat logDirStat; +#ifdef WIN32 + if (stat(directory.c_str(), &logDirStat) != 0) { + if (_mkdir(directory.c_str()) == -1) { + exit(1); + } +#else if (stat(directory.c_str(), &logDirStat) != 0 || !S_ISDIR(logDirStat.st_mode)) { if (mkdir(directory.c_str(), 0777) == -1) { exit(1); } +#endif } file_name = directory + "/" + file_name; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/repository/VolatileContentRepository.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp b/libminifi/src/core/repository/VolatileContentRepository.cpp index 0e99311..47c7ba6 100644 --- a/libminifi/src/core/repository/VolatileContentRepository.cpp +++ b/libminifi/src/core/repository/VolatileContentRepository.cpp @@ -17,6 +17,7 @@ */ #include "core/repository/VolatileContentRepository.h" +#include "capi/expect.h" #include <cstdio> #include <string> #include <memory> @@ -102,7 +103,7 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shar } int size = 0; - if (__builtin_expect(minimize_locking_ == true, 1)) { + if (LIKELY(minimize_locking_ == true)) { for (auto ent : value_vector_) { if (ent->testAndSetKey(claim, nullptr, nullptr, resource_claim_comparator_)) { std::lock_guard<std::mutex> lock(map_mutex_); @@ -113,15 +114,15 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::write(const std::shar size++; } } else { - std::lock_guard < std::mutex > lock(map_mutex_); + std::lock_guard<std::mutex> lock(map_mutex_); auto claim_check = master_list_.find(claim->getContentFullPath()); if (claim_check != master_list_.end()) { - return std::make_shared < io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, claim_check->second); + return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, claim_check->second); } else { AtomicEntry<std::shared_ptr<minifi::ResourceClaim>> *ent = new AtomicEntry<std::shared_ptr<minifi::ResourceClaim>>(¤t_size_, &max_size_); if (ent->testAndSetKey(claim, nullptr, nullptr, resource_claim_comparator_)) { master_list_[claim->getContentFullPath()] = ent; - return std::make_shared< io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent); + return std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent); } } } @@ -158,7 +159,7 @@ std::shared_ptr<io::BaseStream> VolatileContentRepository::read(const std::share } bool VolatileContentRepository::remove(const std::shared_ptr<minifi::ResourceClaim> &claim) { - if (__builtin_expect(minimize_locking_ == true, 1)) { + if (LIKELY(minimize_locking_ == true)) { std::lock_guard<std::mutex> lock(map_mutex_); auto ent = master_list_.find(claim->getContentFullPath()); if (ent != master_list_.end()) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/yaml/YamlConfiguration.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp b/libminifi/src/core/yaml/YamlConfiguration.cpp index ef3a81f..af88082 100644 --- a/libminifi/src/core/yaml/YamlConfiguration.cpp +++ b/libminifi/src/core/yaml/YamlConfiguration.cpp @@ -35,7 +35,7 @@ namespace core { std::shared_ptr<utils::IdGenerator> YamlConfiguration::id_generator_ = utils::IdGenerator::getIdGenerator(); core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node rootFlowNode) { - uuid_t uuid; + utils::Identifier uuid; int version = 0; checkRequiredField(&rootFlowNode, "name", @@ -50,7 +50,7 @@ core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node root } std::string id = getOrGenerateId(&rootFlowNode); - uuid_parse(id.c_str(), uuid); + uuid = id; if (rootFlowNode["version"]) { version = rootFlowNode["version"].as<int>(); @@ -69,7 +69,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core:: int64_t penalizationPeriod = -1; int64_t yieldPeriod = -1; int64_t runDurationNanos = -1; - uuid_t uuid; + utils::Identifier uuid; std::shared_ptr<core::Processor> processor = nullptr; if (!parentGroup) { @@ -97,7 +97,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core:: registerResource(lib_location_str, lib_function_str); } - uuid_parse(procCfg.id.c_str(), uuid); + uuid = procCfg.id.c_str(); logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id); checkRequiredField(&procNode, "class", CONFIG_YAML_PROCESSORS_KEY); procCfg.javaClass = procNode["class"].as<std::string>(); @@ -130,6 +130,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core:: auto periodNode = getOptionalField(&procNode, "scheduling period", YAML::Node(DEFAULT_SCHEDULING_PERIOD_STR), CONFIG_YAML_PROCESSORS_KEY); + procCfg.schedulingPeriod = periodNode.as<std::string>(); logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod); @@ -235,7 +236,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node processorsNode, core:: } void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::ProcessGroup *parentGroup) { - uuid_t uuid; + utils::Identifier uuid; std::string id; if (!parentGroup) { @@ -255,16 +256,17 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => [%s]", name, id); - checkRequiredField(&currRpgNode, "url", + auto urlNode = getOptionalField(&currRpgNode, "url", YAML::Node(""), CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY); - std::string url = currRpgNode["url"].as<std::string>(); + + std::string url = urlNode.as<std::string>(); logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url); core::ProcessGroup *group = NULL; core::TimeUnit unit; int64_t timeoutValue = -1; int64_t yieldPeriodValue = -1; - uuid_parse(id.c_str(), uuid); + uuid = id; group = this->createRemoteProcessGroup(name.c_str(), uuid).release(); group->setParent(parentGroup); parentGroup->addProcessGroup(group); @@ -355,7 +357,7 @@ void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P } void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, core::ProcessGroup *parentGroup) { - uuid_t port_uuid; + utils::Identifier port_uuid; int64_t schedulingPeriod = -1; if (!parentGroup) { @@ -419,7 +421,7 @@ void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor auto batchSizeStr = node["batch size"].as<std::string>(); logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr); - uuid_parse(portUUIDStr.c_str(), port_uuid); + port_uuid = portUUIDStr; reportTask->setPortUUID(port_uuid); if (core::Property::StringToInt(batchSizeStr, lvalue)) { @@ -450,8 +452,8 @@ void YamlConfiguration::parseControllerServices(YAML::Node *controllerServicesNo auto id = controllerServiceNode["id"].as<std::string>(); auto type = controllerServiceNode["class"].as<std::string>(); - uuid_t uuid; - uuid_parse(id.c_str(), uuid); + utils::Identifier uuid; + uuid = id; auto controller_service_node = createControllerService(type, name, uuid); if (nullptr != controller_service_node) { logger_->log_debug("Created Controller Service with UUID %s and name %s", id, name); @@ -488,7 +490,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P std::shared_ptr<minifi::Connection> connection = nullptr; // Configure basic connection - uuid_t uuid; + utils::Identifier uuid; std::string id = getOrGenerateId(&connectionNode); // Default name to be same as ID @@ -499,7 +501,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P name = connectionNode["name"].as<std::string>(); } - uuid_parse(id.c_str(), uuid); + uuid = id; connection = this->createConnection(name, uuid); logger_->log_debug("Created connection with UUID %s and name %s", id, name); @@ -526,7 +528,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P } } - uuid_t srcUUID; + utils::Identifier srcUUID; if (connectionNode["max work queue size"]) { auto max_work_queue_str = connectionNode["max work queue size"].as<std::string>(); @@ -548,7 +550,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P if (connectionNode["source id"]) { std::string connectionSrcProcId = connectionNode["source id"].as<std::string>(); - uuid_parse(connectionSrcProcId.c_str(), srcUUID); + srcUUID = connectionSrcProcId; logger_->log_debug("Using 'source id' to match source with same id for " "connection '%s': source id => [%s]", name, connectionSrcProcId); @@ -557,10 +559,11 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P checkRequiredField(&connectionNode, "source name", CONFIG_YAML_CONNECTIONS_KEY); std::string connectionSrcProcName = connectionNode["source name"].as<std::string>(); - uuid_t tmpUUID; - if (!uuid_parse(connectionSrcProcName.c_str(), tmpUUID) && NULL != parent->findProcessor(tmpUUID)) { + utils::Identifier tmpUUID; + tmpUUID = connectionSrcProcName; + if (NULL != parent->findProcessor(tmpUUID)) { // the source name is a remote port id, so use that as the source id - uuid_copy(srcUUID, tmpUUID); + srcUUID = tmpUUID; logger_->log_debug("Using 'source name' containing a remote port id to match the source for " "connection '%s': source name => [%s]", name, connectionSrcProcName); @@ -582,10 +585,10 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P connection->setSourceUUID(srcUUID); // Configure connection destination - uuid_t destUUID; + utils::Identifier destUUID; if (connectionNode["destination id"]) { std::string connectionDestProcId = connectionNode["destination id"].as<std::string>(); - uuid_parse(connectionDestProcId.c_str(), destUUID); + destUUID = connectionDestProcId; logger_->log_debug("Using 'destination id' to match destination with same id for " "connection '%s': destination id => [%s]", name, connectionDestProcId); @@ -595,11 +598,11 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P checkRequiredField(&connectionNode, "destination name", CONFIG_YAML_CONNECTIONS_KEY); std::string connectionDestProcName = connectionNode["destination name"].as<std::string>(); - uuid_t tmpUUID; - if (!uuid_parse(connectionDestProcName.c_str(), tmpUUID) && - NULL != parent->findProcessor(tmpUUID)) { + utils::Identifier tmpUUID; + tmpUUID = connectionDestProcName; + if (parent->findProcessor(tmpUUID)) { // the destination name is a remote port id, so use that as the dest id - uuid_copy(destUUID, tmpUUID); + destUUID = tmpUUID; logger_->log_debug("Using 'destination name' containing a remote port id to match the destination for " "connection '%s': destination name => [%s]", name, connectionDestProcName); @@ -629,7 +632,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node *connectionsNode, core::P } void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup *parent, sitetosite::TransferDirection direction) { - uuid_t uuid; + utils::Identifier uuid; std::shared_ptr<core::Processor> processor = NULL; std::shared_ptr<minifi::RemoteProcessorGroupPort> port = NULL; @@ -652,7 +655,7 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup * "id should match the corresponding id specified in the NiFi configuration. " "This is a UUID of the format XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX."); auto portId = inputPortsObj["id"].as<std::string>(); - uuid_parse(portId.c_str(), uuid); + uuid = portId; port = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, nameStr, parent->getURL(), this->configuration_, uuid); @@ -845,11 +848,9 @@ std::string YamlConfiguration::getOrGenerateId(YAML::Node *yamlNode, const std:: "of YAML::NodeType::Scalar."); } } else { - uuid_t uuid; + utils::Identifier uuid; id_generator_->generate(uuid); - char uuid_str[37]; - uuid_unparse(uuid, uuid_str); - id = uuid_str; + id = uuid.to_string(); logger_->log_debug("Generating random ID: id => [%s]", id); } return id; http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/ClientSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/ClientSocket.cpp b/libminifi/src/io/ClientSocket.cpp deleted file mode 100644 index 113f914..0000000 --- a/libminifi/src/io/ClientSocket.cpp +++ /dev/null @@ -1,484 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "io/ClientSocket.h" -#include <netinet/tcp.h> -#include <sys/types.h> -#include <netdb.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <sys/ioctl.h> -#include <net/if.h> -#include <ifaddrs.h> -#include <unistd.h> -#include <cstdio> -#include <memory> -#include <utility> -#include <vector> -#include <cerrno> -#include <iostream> -#include <string> -#include "io/validation.h" -#include "core/logging/LoggerConfiguration.h" - -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace io { - -Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners = -1) - : requested_hostname_(hostname), - port_(port), - addr_info_(0), - socket_file_descriptor_(-1), - socket_max_(0), - total_written_(0), - total_read_(0), - is_loopback_only_(false), - listeners_(listeners), - canonical_hostname_(""), - nonBlocking_(false), - logger_(logging::LoggerFactory<Socket>::getLogger()) { - FD_ZERO(&total_list_); - FD_ZERO(&read_fds_); -} - -Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port) - : Socket(context, hostname, port, 0) { -} - -Socket::Socket(const Socket &&other) - : requested_hostname_(std::move(other.requested_hostname_)), - port_(std::move(other.port_)), - is_loopback_only_(false), - addr_info_(std::move(other.addr_info_)), - socket_file_descriptor_(other.socket_file_descriptor_), - socket_max_(other.socket_max_.load()), - listeners_(other.listeners_), - total_list_(other.total_list_), - read_fds_(other.read_fds_), - canonical_hostname_(std::move(other.canonical_hostname_)), - nonBlocking_(false), - logger_(std::move(other.logger_)) { - total_written_ = other.total_written_.load(); - total_read_ = other.total_read_.load(); -} - -Socket::~Socket() { - closeStream(); -} - -void Socket::closeStream() { - if (0 != addr_info_) { - freeaddrinfo(addr_info_); - addr_info_ = 0; - } - if (socket_file_descriptor_ >= 0) { - logging::LOG_DEBUG(logger_) << "Closing " << socket_file_descriptor_; - close(socket_file_descriptor_); - socket_file_descriptor_ = -1; - } - if (total_written_ > 0) { - local_network_interface_.log_write(total_written_); - total_written_ = 0; - } - if (total_read_ > 0) { - local_network_interface_.log_read(total_read_); - total_read_ = 0; - } -} - -void Socket::setNonBlocking() { - if (listeners_ <= 0) { - nonBlocking_ = true; - } -} - -int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { - if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) { - logger_->log_error("error while connecting to server socket"); - return -1; - } - - setSocketOptions(socket_file_descriptor_); - - if (listeners_ <= 0 && !local_network_interface_.getInterface().empty()) { - // bind to local network interface - ifaddrs* list = NULL; - ifaddrs* item = NULL; - ifaddrs* itemFound = NULL; - int result = getifaddrs(&list); - if (result == 0) { - item = list; - while (item) { - if ((item->ifa_addr != NULL) && (item->ifa_name != NULL) && (AF_INET == item->ifa_addr->sa_family)) { - if (strcmp(item->ifa_name, local_network_interface_.getInterface().c_str()) == 0) { - itemFound = item; - break; - } - } - item = item->ifa_next; - } - - if (itemFound != NULL) { - result = bind(socket_file_descriptor_, itemFound->ifa_addr, sizeof(struct sockaddr_in)); - if (result < 0) - logger_->log_info("Bind to interface %s failed %s", local_network_interface_.getInterface(), strerror(errno)); - else - logger_->log_info("Bind to interface %s", local_network_interface_.getInterface()); - } - freeifaddrs(list); - } - } - - if (listeners_ > 0) { - struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr; - sa_loc->sin_family = AF_INET; - sa_loc->sin_port = htons(port_); - if (is_loopback_only_) { - sa_loc->sin_addr.s_addr = htonl(INADDR_LOOPBACK); - } else { - sa_loc->sin_addr.s_addr = htonl(INADDR_ANY); - } - if (bind(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) { - logger_->log_error("Could not bind to socket, reason %s", strerror(errno)); - return -1; - } - } - { - if (listeners_ <= 0) { - struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr; - sa_loc->sin_family = AF_INET; - sa_loc->sin_port = htons(port_); - // use any address if you are connecting to the local machine for testing - // otherwise we must use the requested hostname - if (IsNullOrEmpty(requested_hostname_) || requested_hostname_ == "localhost") { - if (is_loopback_only_) { - sa_loc->sin_addr.s_addr = htonl(INADDR_LOOPBACK); - } else { - sa_loc->sin_addr.s_addr = htonl(INADDR_ANY); - } - } else { - sa_loc->sin_addr.s_addr = addr; - } - if (connect(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) { - close(socket_file_descriptor_); - socket_file_descriptor_ = -1; - return -1; - } - } - } - - // listen - if (listeners_ > 0) { - if (listen(socket_file_descriptor_, listeners_) == -1) { - return -1; - } else { - logger_->log_debug("Created connection with %d listeners", listeners_); - } - } - // add the listener to the total set - FD_SET(socket_file_descriptor_, &total_list_); - socket_max_ = socket_file_descriptor_; - logger_->log_debug("Created connection with file descriptor %d", socket_file_descriptor_); - return 0; -} - -int16_t Socket::initialize() { - addrinfo hints = { sizeof(addrinfo) }; - memset(&hints, 0, sizeof hints); // make sure the struct is empty - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_CANONNAME; - if (listeners_ > 0) - hints.ai_flags |= AI_PASSIVE; - hints.ai_protocol = 0; /* any protocol */ - - int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints, &addr_info_); - - if (errcode != 0) { - logger_->log_error("Saw error during getaddrinfo, error: %s", strerror(errno)); - return -1; - } - - socket_file_descriptor_ = -1; - - in_addr_t addr; - struct hostent *h; -#ifdef __MACH__ - h = gethostbyname(requested_hostname_.c_str()); -#else - const char *host; - - host = requested_hostname_.c_str(); - char buf[1024]; - struct hostent he; - int hh_errno; - gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); -#endif - if (h == nullptr) { - return -1; - } - memcpy(reinterpret_cast<char*>(&addr), h->h_addr_list[0], h->h_length); - - auto p = addr_info_; - for (; p != NULL; p = p->ai_next) { - if (IsNullOrEmpty(canonical_hostname_)) { - if (!IsNullOrEmpty(p) && !IsNullOrEmpty(p->ai_canonname)) - canonical_hostname_ = p->ai_canonname; - } - // we've successfully connected - if (port_ > 0 && createConnection(p, addr) >= 0) { - // Put the socket in non-blocking mode: - if (nonBlocking_) { - if (fcntl(socket_file_descriptor_, F_SETFL, O_NONBLOCK) < 0) { - // handle error - logger_->log_error("Could not create non blocking to socket", strerror(errno)); - } else { - logger_->log_debug("Successfully applied O_NONBLOCK to fd"); - } - } - logger_->log_debug("Successfully created connection"); - return 0; - break; - } - } - - logger_->log_debug("Could not find device for our connection"); - return -1; -} - -int16_t Socket::select_descriptor(const uint16_t msec) { - if (listeners_ == 0) { - return socket_file_descriptor_; - } - - struct timeval tv; - - read_fds_ = total_list_; - - tv.tv_sec = msec / 1000; - tv.tv_usec = (msec % 1000) * 1000; - - std::lock_guard<std::recursive_mutex> guard(selection_mutex_); - - if (msec > 0) - select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv); - else - select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL); - - for (int i = 0; i <= socket_max_; i++) { - if (FD_ISSET(i, &read_fds_)) { - if (i == socket_file_descriptor_) { - if (listeners_ > 0) { - struct sockaddr_storage remoteaddr; // client address - socklen_t addrlen = sizeof remoteaddr; - int newfd = accept(socket_file_descriptor_, (struct sockaddr *) &remoteaddr, &addrlen); - FD_SET(newfd, &total_list_); // add to master set - if (newfd > socket_max_) { // keep track of the max - socket_max_ = newfd; - } - return newfd; - - } else { - return socket_file_descriptor_; - } - // we have a new connection - } else { - // data to be received on i - return i; - } - } - } - - logger_->log_debug("Could not find a suitable file descriptor or select timed out"); - - return -1; -} - -int16_t Socket::setSocketOptions(const int sock) { - int opt = 1; -#ifndef __MACH__ - if (setsockopt(sock, SOL_TCP, TCP_NODELAY, static_cast<void*>(&opt), sizeof(opt)) < 0) { - logger_->log_error("setsockopt() TCP_NODELAY failed"); - close(sock); - return -1; - } - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) { - logger_->log_error("setsockopt() SO_REUSEADDR failed"); - close(sock); - return -1; - } - - int sndsize = 256 * 1024; - if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char *>(&sndsize), sizeof(sndsize)) < 0) { - logger_->log_error("setsockopt() SO_SNDBUF failed"); - close(sock); - return -1; - } - -#else - if (listeners_ > 0) { - // lose the pesky "address already in use" error message - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&opt), sizeof(opt)) < 0) { - logger_->log_error("setsockopt() SO_REUSEADDR failed"); - close(sock); - return -1; - } - } -#endif - return 0; -} - -std::string Socket::getHostname() const { - return canonical_hostname_; -} - -int Socket::writeData(std::vector<uint8_t> &buf, int buflen) { - if (static_cast<int>(buf.capacity()) < buflen) - return -1; - return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen); -} - -// data stream overrides - -int Socket::writeData(uint8_t *value, int size) { - int ret = 0, bytes = 0; - - int fd = select_descriptor(1000); - while (bytes < size) { - ret = send(fd, value + bytes, size - bytes, 0); - // check for errors - if (ret <= 0) { - close(fd); - logger_->log_error("Could not send to %d, error: %s", fd, strerror(errno)); - return ret; - } - bytes += ret; - } - - if (ret) - logger_->log_trace("Send data size %d over socket %d", size, fd); - total_written_ += bytes; - return bytes; -} - -template<typename T> -inline std::vector<uint8_t> Socket::readBuffer(const T& t) { - std::vector<uint8_t> buf; - buf.resize(sizeof t); - readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t)); - return buf; -} - -int Socket::write(uint64_t base_value, bool is_little_endian) { - return Serializable::write(base_value, this, is_little_endian); -} - -int Socket::write(uint32_t base_value, bool is_little_endian) { - return Serializable::write(base_value, this, is_little_endian); -} - -int Socket::write(uint16_t base_value, bool is_little_endian) { - return Serializable::write(base_value, this, is_little_endian); -} - -int Socket::read(uint64_t &value, bool is_little_endian) { - auto buf = readBuffer(value); - - if (is_little_endian) { - value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24) - | ((uint64_t) (buf[5] & 255) << 16) | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0); - } else { - value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) | ((uint64_t) (buf[4] & 255) << 32) - | ((uint64_t) (buf[5] & 255) << 40) | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56); - } - return sizeof(value); -} - -int Socket::read(uint32_t &value, bool is_little_endian) { - auto buf = readBuffer(value); - - if (is_little_endian) { - value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; - } else { - value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24; - } - return sizeof(value); -} - -int Socket::read(uint16_t &value, bool is_little_endian) { - auto buf = readBuffer(value); - - if (is_little_endian) { - value = (buf[0] << 8) | buf[1]; - } else { - value = buf[0] | buf[1] << 8; - } - return sizeof(value); -} - -int Socket::readData(std::vector<uint8_t> &buf, int buflen, bool retrieve_all_bytes) { - if (static_cast<int>(buf.capacity()) < buflen) { - buf.resize(buflen); - } - return readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen, retrieve_all_bytes); -} - -int Socket::readData(uint8_t *buf, int buflen, bool retrieve_all_bytes) { - int32_t total_read = 0; - while (buflen) { - int16_t fd = select_descriptor(1000); - if (fd < 0) { - if (listeners_ <= 0) { - logger_->log_debug("fd %d close %i", fd, buflen); - close(socket_file_descriptor_); - } - return -1; - } - int bytes_read = recv(fd, buf, buflen, 0); - logger_->log_trace("Recv call %d", bytes_read); - if (bytes_read <= 0) { - if (bytes_read == 0) { - logger_->log_debug("Other side hung up on %d", fd); - } else { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - // continue - return -2; - } - logger_->log_error("Could not recv on %d ( port %d), error: %s", fd, port_, strerror(errno)); - } - return -1; - } - buflen -= bytes_read; - buf += bytes_read; - total_read += bytes_read; - if (!retrieve_all_bytes) { - break; - } - } - total_read_ += total_read; - return total_read; -} - -} /* namespace io */ -} /* namespace minifi */ -} /* namespace nifi */ -} /* namespace apache */ -} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/DataStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/DataStream.cpp b/libminifi/src/io/DataStream.cpp index 656bcdb..871e21a 100644 --- a/libminifi/src/io/DataStream.cpp +++ b/libminifi/src/io/DataStream.cpp @@ -16,7 +16,6 @@ * limitations under the License. */ #include "io/DataStream.h" -#include <arpa/inet.h> #include <vector> #include <iostream> #include <cstdint> @@ -24,6 +23,7 @@ #include <cstring> #include <string> #include <algorithm> +#include <iterator> namespace org { namespace apache { @@ -95,9 +95,14 @@ int DataStream::readData(std::vector<uint8_t> &buf, int buflen) { } if (static_cast<int>(buf.capacity()) < buflen) - buf.resize(buflen); + buf.resize(buflen+1); - buf.insert(buf.begin(), &buffer[readBuffer], &buffer[readBuffer + buflen]); +#ifdef WIN32 + // back inserter works differently on win32 versions + buf.insert(buf.begin(), &buffer[readBuffer], &buffer[(readBuffer + buflen-1)]); +#else + buf.insert(buf.begin(), &buffer[readBuffer], &buffer[(readBuffer + buflen)]); +#endif readBuffer += buflen; return buflen; @@ -108,9 +113,12 @@ int DataStream::readData(uint8_t *buf, int buflen) { // if read exceed return -1; } - - std::copy(&buffer[readBuffer], &buffer[readBuffer + buflen], buf); - +#ifdef WIN32 + // back inserter works differently on win32 versions + std::copy(&buffer[readBuffer], &buffer[(readBuffer + buflen-1)], buf); +#else + std::copy(&buffer[readBuffer], &buffer[(readBuffer + buflen)], buf); +#endif readBuffer += buflen; return buflen; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/DescriptorStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/DescriptorStream.cpp b/libminifi/src/io/DescriptorStream.cpp index d50a39f..38e1c85 100644 --- a/libminifi/src/io/DescriptorStream.cpp +++ b/libminifi/src/io/DescriptorStream.cpp @@ -18,7 +18,6 @@ #include "io/DescriptorStream.h" #include <fstream> -#include <unistd.h> #include <vector> #include <memory> #include <string> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/Serializable.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/Serializable.cpp b/libminifi/src/io/Serializable.cpp index 7d3c79a..081f3db 100644 --- a/libminifi/src/io/Serializable.cpp +++ b/libminifi/src/io/Serializable.cpp @@ -16,13 +16,17 @@ * limitations under the License. */ #include "io/Serializable.h" -#include <arpa/inet.h> #include <cstdio> #include <iostream> #include <vector> #include <string> #include <algorithm> #include "io/DataStream.h" +#ifdef WIN32 +#include "Winsock2.h" +#else +#include <arpa/inet.h> +#endif namespace org { namespace apache { namespace nifi { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/ServerSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/ServerSocket.cpp b/libminifi/src/io/ServerSocket.cpp index bfb098f..cf79e0c 100644 --- a/libminifi/src/io/ServerSocket.cpp +++ b/libminifi/src/io/ServerSocket.cpp @@ -17,13 +17,16 @@ */ #include "io/ServerSocket.h" #include "io/DescriptorStream.h" -#include <netinet/tcp.h> + #include <sys/types.h> +#ifndef WIN32 +#include <netinet/tcp.h> #include <netdb.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> +#endif #include <cstdio> #include <memory> #include <utility> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/StreamFactory.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/StreamFactory.cpp b/libminifi/src/io/StreamFactory.cpp index a66e4c1..bb021ae 100644 --- a/libminifi/src/io/StreamFactory.cpp +++ b/libminifi/src/io/StreamFactory.cpp @@ -98,7 +98,11 @@ StreamFactory::StreamFactory(const std::shared_ptr<Configure> &configure) { std::string secureStr; bool is_secure = false; if (configure->get(Configure::nifi_remote_input_secure, secureStr) && org::apache::nifi::minifi::utils::StringUtils::StringToBool(secureStr, is_secure)) { +#ifdef OPENSSL_SUPPORT delegate_ = std::make_shared<SocketCreator<TLSSocket, TLSContext>>(configure); +#else + delegate_ = std::make_shared<SocketCreator<Socket, SocketContext>>(configure); +#endif } else { delegate_ = std::make_shared<SocketCreator<Socket, SocketContext>>(configure); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/posix/ClientSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/posix/ClientSocket.cpp b/libminifi/src/io/posix/ClientSocket.cpp new file mode 100644 index 0000000..113f914 --- /dev/null +++ b/libminifi/src/io/posix/ClientSocket.cpp @@ -0,0 +1,484 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "io/ClientSocket.h" +#include <netinet/tcp.h> +#include <sys/types.h> +#include <netdb.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <sys/ioctl.h> +#include <net/if.h> +#include <ifaddrs.h> +#include <unistd.h> +#include <cstdio> +#include <memory> +#include <utility> +#include <vector> +#include <cerrno> +#include <iostream> +#include <string> +#include "io/validation.h" +#include "core/logging/LoggerConfiguration.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners = -1) + : requested_hostname_(hostname), + port_(port), + addr_info_(0), + socket_file_descriptor_(-1), + socket_max_(0), + total_written_(0), + total_read_(0), + is_loopback_only_(false), + listeners_(listeners), + canonical_hostname_(""), + nonBlocking_(false), + logger_(logging::LoggerFactory<Socket>::getLogger()) { + FD_ZERO(&total_list_); + FD_ZERO(&read_fds_); +} + +Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port) + : Socket(context, hostname, port, 0) { +} + +Socket::Socket(const Socket &&other) + : requested_hostname_(std::move(other.requested_hostname_)), + port_(std::move(other.port_)), + is_loopback_only_(false), + addr_info_(std::move(other.addr_info_)), + socket_file_descriptor_(other.socket_file_descriptor_), + socket_max_(other.socket_max_.load()), + listeners_(other.listeners_), + total_list_(other.total_list_), + read_fds_(other.read_fds_), + canonical_hostname_(std::move(other.canonical_hostname_)), + nonBlocking_(false), + logger_(std::move(other.logger_)) { + total_written_ = other.total_written_.load(); + total_read_ = other.total_read_.load(); +} + +Socket::~Socket() { + closeStream(); +} + +void Socket::closeStream() { + if (0 != addr_info_) { + freeaddrinfo(addr_info_); + addr_info_ = 0; + } + if (socket_file_descriptor_ >= 0) { + logging::LOG_DEBUG(logger_) << "Closing " << socket_file_descriptor_; + close(socket_file_descriptor_); + socket_file_descriptor_ = -1; + } + if (total_written_ > 0) { + local_network_interface_.log_write(total_written_); + total_written_ = 0; + } + if (total_read_ > 0) { + local_network_interface_.log_read(total_read_); + total_read_ = 0; + } +} + +void Socket::setNonBlocking() { + if (listeners_ <= 0) { + nonBlocking_ = true; + } +} + +int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { + if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1) { + logger_->log_error("error while connecting to server socket"); + return -1; + } + + setSocketOptions(socket_file_descriptor_); + + if (listeners_ <= 0 && !local_network_interface_.getInterface().empty()) { + // bind to local network interface + ifaddrs* list = NULL; + ifaddrs* item = NULL; + ifaddrs* itemFound = NULL; + int result = getifaddrs(&list); + if (result == 0) { + item = list; + while (item) { + if ((item->ifa_addr != NULL) && (item->ifa_name != NULL) && (AF_INET == item->ifa_addr->sa_family)) { + if (strcmp(item->ifa_name, local_network_interface_.getInterface().c_str()) == 0) { + itemFound = item; + break; + } + } + item = item->ifa_next; + } + + if (itemFound != NULL) { + result = bind(socket_file_descriptor_, itemFound->ifa_addr, sizeof(struct sockaddr_in)); + if (result < 0) + logger_->log_info("Bind to interface %s failed %s", local_network_interface_.getInterface(), strerror(errno)); + else + logger_->log_info("Bind to interface %s", local_network_interface_.getInterface()); + } + freeifaddrs(list); + } + } + + if (listeners_ > 0) { + struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr; + sa_loc->sin_family = AF_INET; + sa_loc->sin_port = htons(port_); + if (is_loopback_only_) { + sa_loc->sin_addr.s_addr = htonl(INADDR_LOOPBACK); + } else { + sa_loc->sin_addr.s_addr = htonl(INADDR_ANY); + } + if (bind(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) { + logger_->log_error("Could not bind to socket, reason %s", strerror(errno)); + return -1; + } + } + { + if (listeners_ <= 0) { + struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr; + sa_loc->sin_family = AF_INET; + sa_loc->sin_port = htons(port_); + // use any address if you are connecting to the local machine for testing + // otherwise we must use the requested hostname + if (IsNullOrEmpty(requested_hostname_) || requested_hostname_ == "localhost") { + if (is_loopback_only_) { + sa_loc->sin_addr.s_addr = htonl(INADDR_LOOPBACK); + } else { + sa_loc->sin_addr.s_addr = htonl(INADDR_ANY); + } + } else { + sa_loc->sin_addr.s_addr = addr; + } + if (connect(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) { + close(socket_file_descriptor_); + socket_file_descriptor_ = -1; + return -1; + } + } + } + + // listen + if (listeners_ > 0) { + if (listen(socket_file_descriptor_, listeners_) == -1) { + return -1; + } else { + logger_->log_debug("Created connection with %d listeners", listeners_); + } + } + // add the listener to the total set + FD_SET(socket_file_descriptor_, &total_list_); + socket_max_ = socket_file_descriptor_; + logger_->log_debug("Created connection with file descriptor %d", socket_file_descriptor_); + return 0; +} + +int16_t Socket::initialize() { + addrinfo hints = { sizeof(addrinfo) }; + memset(&hints, 0, sizeof hints); // make sure the struct is empty + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_CANONNAME; + if (listeners_ > 0) + hints.ai_flags |= AI_PASSIVE; + hints.ai_protocol = 0; /* any protocol */ + + int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints, &addr_info_); + + if (errcode != 0) { + logger_->log_error("Saw error during getaddrinfo, error: %s", strerror(errno)); + return -1; + } + + socket_file_descriptor_ = -1; + + in_addr_t addr; + struct hostent *h; +#ifdef __MACH__ + h = gethostbyname(requested_hostname_.c_str()); +#else + const char *host; + + host = requested_hostname_.c_str(); + char buf[1024]; + struct hostent he; + int hh_errno; + gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); +#endif + if (h == nullptr) { + return -1; + } + memcpy(reinterpret_cast<char*>(&addr), h->h_addr_list[0], h->h_length); + + auto p = addr_info_; + for (; p != NULL; p = p->ai_next) { + if (IsNullOrEmpty(canonical_hostname_)) { + if (!IsNullOrEmpty(p) && !IsNullOrEmpty(p->ai_canonname)) + canonical_hostname_ = p->ai_canonname; + } + // we've successfully connected + if (port_ > 0 && createConnection(p, addr) >= 0) { + // Put the socket in non-blocking mode: + if (nonBlocking_) { + if (fcntl(socket_file_descriptor_, F_SETFL, O_NONBLOCK) < 0) { + // handle error + logger_->log_error("Could not create non blocking to socket", strerror(errno)); + } else { + logger_->log_debug("Successfully applied O_NONBLOCK to fd"); + } + } + logger_->log_debug("Successfully created connection"); + return 0; + break; + } + } + + logger_->log_debug("Could not find device for our connection"); + return -1; +} + +int16_t Socket::select_descriptor(const uint16_t msec) { + if (listeners_ == 0) { + return socket_file_descriptor_; + } + + struct timeval tv; + + read_fds_ = total_list_; + + tv.tv_sec = msec / 1000; + tv.tv_usec = (msec % 1000) * 1000; + + std::lock_guard<std::recursive_mutex> guard(selection_mutex_); + + if (msec > 0) + select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv); + else + select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL); + + for (int i = 0; i <= socket_max_; i++) { + if (FD_ISSET(i, &read_fds_)) { + if (i == socket_file_descriptor_) { + if (listeners_ > 0) { + struct sockaddr_storage remoteaddr; // client address + socklen_t addrlen = sizeof remoteaddr; + int newfd = accept(socket_file_descriptor_, (struct sockaddr *) &remoteaddr, &addrlen); + FD_SET(newfd, &total_list_); // add to master set + if (newfd > socket_max_) { // keep track of the max + socket_max_ = newfd; + } + return newfd; + + } else { + return socket_file_descriptor_; + } + // we have a new connection + } else { + // data to be received on i + return i; + } + } + } + + logger_->log_debug("Could not find a suitable file descriptor or select timed out"); + + return -1; +} + +int16_t Socket::setSocketOptions(const int sock) { + int opt = 1; +#ifndef __MACH__ + if (setsockopt(sock, SOL_TCP, TCP_NODELAY, static_cast<void*>(&opt), sizeof(opt)) < 0) { + logger_->log_error("setsockopt() TCP_NODELAY failed"); + close(sock); + return -1; + } + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) { + logger_->log_error("setsockopt() SO_REUSEADDR failed"); + close(sock); + return -1; + } + + int sndsize = 256 * 1024; + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char *>(&sndsize), sizeof(sndsize)) < 0) { + logger_->log_error("setsockopt() SO_SNDBUF failed"); + close(sock); + return -1; + } + +#else + if (listeners_ > 0) { + // lose the pesky "address already in use" error message + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&opt), sizeof(opt)) < 0) { + logger_->log_error("setsockopt() SO_REUSEADDR failed"); + close(sock); + return -1; + } + } +#endif + return 0; +} + +std::string Socket::getHostname() const { + return canonical_hostname_; +} + +int Socket::writeData(std::vector<uint8_t> &buf, int buflen) { + if (static_cast<int>(buf.capacity()) < buflen) + return -1; + return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen); +} + +// data stream overrides + +int Socket::writeData(uint8_t *value, int size) { + int ret = 0, bytes = 0; + + int fd = select_descriptor(1000); + while (bytes < size) { + ret = send(fd, value + bytes, size - bytes, 0); + // check for errors + if (ret <= 0) { + close(fd); + logger_->log_error("Could not send to %d, error: %s", fd, strerror(errno)); + return ret; + } + bytes += ret; + } + + if (ret) + logger_->log_trace("Send data size %d over socket %d", size, fd); + total_written_ += bytes; + return bytes; +} + +template<typename T> +inline std::vector<uint8_t> Socket::readBuffer(const T& t) { + std::vector<uint8_t> buf; + buf.resize(sizeof t); + readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t)); + return buf; +} + +int Socket::write(uint64_t base_value, bool is_little_endian) { + return Serializable::write(base_value, this, is_little_endian); +} + +int Socket::write(uint32_t base_value, bool is_little_endian) { + return Serializable::write(base_value, this, is_little_endian); +} + +int Socket::write(uint16_t base_value, bool is_little_endian) { + return Serializable::write(base_value, this, is_little_endian); +} + +int Socket::read(uint64_t &value, bool is_little_endian) { + auto buf = readBuffer(value); + + if (is_little_endian) { + value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24) + | ((uint64_t) (buf[5] & 255) << 16) | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0); + } else { + value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) | ((uint64_t) (buf[4] & 255) << 32) + | ((uint64_t) (buf[5] & 255) << 40) | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56); + } + return sizeof(value); +} + +int Socket::read(uint32_t &value, bool is_little_endian) { + auto buf = readBuffer(value); + + if (is_little_endian) { + value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; + } else { + value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24; + } + return sizeof(value); +} + +int Socket::read(uint16_t &value, bool is_little_endian) { + auto buf = readBuffer(value); + + if (is_little_endian) { + value = (buf[0] << 8) | buf[1]; + } else { + value = buf[0] | buf[1] << 8; + } + return sizeof(value); +} + +int Socket::readData(std::vector<uint8_t> &buf, int buflen, bool retrieve_all_bytes) { + if (static_cast<int>(buf.capacity()) < buflen) { + buf.resize(buflen); + } + return readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen, retrieve_all_bytes); +} + +int Socket::readData(uint8_t *buf, int buflen, bool retrieve_all_bytes) { + int32_t total_read = 0; + while (buflen) { + int16_t fd = select_descriptor(1000); + if (fd < 0) { + if (listeners_ <= 0) { + logger_->log_debug("fd %d close %i", fd, buflen); + close(socket_file_descriptor_); + } + return -1; + } + int bytes_read = recv(fd, buf, buflen, 0); + logger_->log_trace("Recv call %d", bytes_read); + if (bytes_read <= 0) { + if (bytes_read == 0) { + logger_->log_debug("Other side hung up on %d", fd); + } else { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // continue + return -2; + } + logger_->log_error("Could not recv on %d ( port %d), error: %s", fd, port_, strerror(errno)); + } + return -1; + } + buflen -= bytes_read; + buf += bytes_read; + total_read += bytes_read; + if (!retrieve_all_bytes) { + break; + } + } + total_read_ += total_read; + return total_read; +} + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/tls/SecureDescriptorStream.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp b/libminifi/src/io/tls/SecureDescriptorStream.cpp index c1ddd8a..9e1334f 100644 --- a/libminifi/src/io/tls/SecureDescriptorStream.cpp +++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp @@ -18,7 +18,6 @@ #include "io/tls/SecureDescriptorStream.h" #include <fstream> -#include <unistd.h> #include <vector> #include <memory> #include <string> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/tls/TLSServerSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/tls/TLSServerSocket.cpp b/libminifi/src/io/tls/TLSServerSocket.cpp index f7901d3..22b01a3 100644 --- a/libminifi/src/io/tls/TLSServerSocket.cpp +++ b/libminifi/src/io/tls/TLSServerSocket.cpp @@ -17,13 +17,16 @@ */ #include "io/tls/SecureDescriptorStream.h" #include "io/tls/TLSServerSocket.h" -#include <netinet/tcp.h> + #include <sys/types.h> +#ifndef WIN32 +#include <netinet/tcp.h> #include <netdb.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> +#endif #include <cstdio> #include <memory> #include <chrono> http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/win/ClientSocket.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/io/win/ClientSocket.cpp b/libminifi/src/io/win/ClientSocket.cpp new file mode 100644 index 0000000..261c2af --- /dev/null +++ b/libminifi/src/io/win/ClientSocket.cpp @@ -0,0 +1,538 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "io/ClientSocket.h" +#ifndef WIN32 +#include <netinet/tcp.h> +#include <netdb.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <sys/ioctl.h> +#include <net/if.h> +#include <ifaddrs.h> +#include <unistd.h> +#endif +#include <sys/types.h> + +#include <cstdio> +#include <memory> +#include <utility> +#include <vector> +#include <cerrno> +#include <iostream> +#include <string> +#include "io/validation.h" +#include "core/logging/LoggerConfiguration.h" + +namespace org { +namespace apache { +namespace nifi { +namespace minifi { +namespace io { + +Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port, const uint16_t listeners = -1) + : requested_hostname_(hostname), + port_(port), + addr_info_(0), + socket_file_descriptor_(-1), + socket_max_(0), + total_written_(0), + total_read_(0), + is_loopback_only_(false), + listeners_(listeners), + canonical_hostname_(""), + nonBlocking_(false), + logger_(logging::LoggerFactory<Socket>::getLogger()) { + FD_ZERO(&total_list_); + FD_ZERO(&read_fds_); + initialize_socket(); +} + +Socket::Socket(const std::shared_ptr<SocketContext> &context, const std::string &hostname, const uint16_t port) + : Socket(context, hostname, port, 0) { + initialize_socket(); +} + +Socket::Socket(const Socket &&other) + : requested_hostname_(std::move(other.requested_hostname_)), + port_(std::move(other.port_)), + is_loopback_only_(false), + addr_info_(std::move(other.addr_info_)), + socket_file_descriptor_(other.socket_file_descriptor_), + socket_max_(other.socket_max_.load()), + listeners_(other.listeners_), + total_list_(other.total_list_), + read_fds_(other.read_fds_), + canonical_hostname_(std::move(other.canonical_hostname_)), + nonBlocking_(false), + logger_(std::move(other.logger_)) { + total_written_ = other.total_written_.load(); + total_read_ = other.total_read_.load(); +} + +Socket::~Socket() { + closeStream(); +} + +void Socket::closeStream() { + if (0 != addr_info_) { + freeaddrinfo(addr_info_); + addr_info_ = 0; + } + if (socket_file_descriptor_ >= 0 && socket_file_descriptor_ != INVALID_SOCKET) { + logging::LOG_DEBUG(logger_) << "Closing " << socket_file_descriptor_; +#ifdef WIN32 + closesocket(socket_file_descriptor_); +#else + close(socket_file_descriptor_); +#endif + socket_file_descriptor_ = -1; + } + if (total_written_ > 0) { + local_network_interface_.log_write(total_written_); + total_written_ = 0; + } + if (total_read_ > 0) { + local_network_interface_.log_read(total_read_); + total_read_ = 0; + } +} + +void Socket::setNonBlocking() { + if (listeners_ <= 0) { + nonBlocking_ = true; + } +} +#ifdef WIN32 +int8_t Socket::createConnection(const addrinfo *p, struct in_addr &addr) { +#else +int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) { +#endif + if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == INVALID_SOCKET) { + logger_->log_error("error while connecting to server socket"); + return -1; + } + + setSocketOptions(socket_file_descriptor_); + + if (listeners_ <= 0 && !local_network_interface_.getInterface().empty()) { + // bind to local network interface +#ifndef WIN32 + ifaddrs* list = NULL; + ifaddrs* item = NULL; + ifaddrs* itemFound = NULL; + int result = getifaddrs(&list); + if (result == 0) { + item = list; + while (item) { + if ((item->ifa_addr != NULL) && (item->ifa_name != NULL) && (AF_INET == item->ifa_addr->sa_family)) { + if (strcmp(item->ifa_name, local_network_interface_.getInterface().c_str()) == 0) { + itemFound = item; + break; + } + } + item = item->ifa_next; + } + + if (itemFound != NULL) { + result = bind(socket_file_descriptor_, itemFound->ifa_addr, sizeof(struct sockaddr_in)); + if (result < 0) + logger_->log_info("Bind to interface %s failed %s", local_network_interface_.getInterface(), strerror(errno)); + else + logger_->log_info("Bind to interface %s", local_network_interface_.getInterface()); + } + freeifaddrs(list); + } +#endif + } + + if (listeners_ > 0) { + struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr; + sa_loc->sin_family = AF_INET; + sa_loc->sin_port = htons(port_); + if (is_loopback_only_) { + sa_loc->sin_addr.s_addr = htonl(INADDR_LOOPBACK); + } else { + sa_loc->sin_addr.s_addr = htonl(INADDR_ANY); + } + if (bind(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) { + logger_->log_error("Could not bind to socket, reason %s", strerror(errno)); + return -1; + } + } + { + if (listeners_ <= 0) { + struct sockaddr_in sa_loc; + memset(&sa_loc, 0x00, sizeof(sa_loc)); + sa_loc.sin_family = AF_INET; + sa_loc.sin_port = htons(port_); + + // use any address if you are connecting to the local machine for testing + // otherwise we must use the requested hostname + if (IsNullOrEmpty(requested_hostname_) || requested_hostname_ == "localhost") { + if (is_loopback_only_) { + sa_loc.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + } else { + sa_loc.sin_addr.s_addr = htonl(INADDR_ANY); + } + } else { +#ifdef WIN32 + sa_loc.sin_addr.s_addr = addr.s_addr; +#else + sa_loc.sin_addr.s_addr = addr; +#endif + } + if (connect(socket_file_descriptor_, (struct sockaddr*) &sa_loc, sizeof(sa_loc)) == -1) { +#ifdef WIN32 + int err = WSAGetLastError(); + if (err == WSAEADDRNOTAVAIL) { + logger_->log_error("invalid or unknown IP"); + } else if (err == WSAECONNREFUSED) { + logger_->log_error("Connection refused"); + } else { + logger_->log_error("Unknown error"); + } + +#endif + closeStream(); + socket_file_descriptor_ = -1; + return -1; + } + } + } + + // listen + if (listeners_ > 0) { + if (listen(socket_file_descriptor_, listeners_) == -1) { + return -1; + } else { + logger_->log_debug("Created connection with %d listeners", listeners_); + } + } + // add the listener to the total set + FD_SET(socket_file_descriptor_, &total_list_); + socket_max_ = socket_file_descriptor_; + logger_->log_debug("Created connection with file descriptor %d", socket_file_descriptor_); + return 0; +} + +int16_t Socket::initialize() { + addrinfo hints = { sizeof(addrinfo) }; + memset(&hints, 0, sizeof hints); // make sure the struct is empty + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_CANONNAME; + if (listeners_ > 0) + hints.ai_flags |= AI_PASSIVE; + hints.ai_protocol = 0; /* any protocol */ + + int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints, &addr_info_); + + if (errcode != 0) { + logger_->log_error("Saw error during getaddrinfo, error: %s", strerror(errno)); + return -1; + } + + socket_file_descriptor_ = -1; + +#ifndef WIN32 + in_addr_t addr; +#else + in_addr addr; +#endif + struct hostent *h; +#if defined( __MACH__ ) || defined(WIN32) + h = gethostbyname(requested_hostname_.c_str()); +#else + const char *host; + + host = requested_hostname_.c_str(); + char buf[1024]; + struct hostent he; + int hh_errno; + gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); +#endif + if (h == nullptr) { + return -1; + } + memcpy(reinterpret_cast<char*>(&addr), h->h_addr_list[0], h->h_length); + + auto p = addr_info_; + for (; p != NULL; p = p->ai_next) { + if (IsNullOrEmpty(canonical_hostname_)) { + if (!IsNullOrEmpty(p) && !IsNullOrEmpty(p->ai_canonname)) + canonical_hostname_ = p->ai_canonname; + } + // we've successfully connected + if (port_ > 0 && createConnection(p, addr) >= 0) { + // Put the socket in non-blocking mode: + if (nonBlocking_) { +#ifndef WIN32 + if (fcntl(socket_file_descriptor_, F_SETFL, O_NONBLOCK) < 0) { + // handle error + logger_->log_error("Could not create non blocking to socket", strerror(errno)); + } else { + logger_->log_debug("Successfully applied O_NONBLOCK to fd"); + } +#else + u_long iMode = 1; + if (ioctlsocket(socket_file_descriptor_, FIONBIO, &iMode) == NO_ERROR) { + logger_->log_debug("Successfully applied O_NONBLOCK to fd"); + } +#endif + } + logger_->log_debug("Successfully created connection"); + return 0; + break; + } + } + + logger_->log_debug("Could not find device for our connection"); + return -1; +} + +int16_t Socket::select_descriptor(const uint16_t msec) { + if (listeners_ == 0) { + return socket_file_descriptor_; + } + + struct timeval tv; + + read_fds_ = total_list_; + + tv.tv_sec = msec / 1000; + tv.tv_usec = (msec % 1000) * 1000; + + std::lock_guard<std::recursive_mutex> guard(selection_mutex_); + + if (msec > 0) + select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv); + else + select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL); + + for (int i = 0; i <= socket_max_; i++) { + if (FD_ISSET(i, &read_fds_)) { + if (i == socket_file_descriptor_) { + if (listeners_ > 0) { + struct sockaddr_storage remoteaddr; // client address + socklen_t addrlen = sizeof remoteaddr; + int newfd = accept(socket_file_descriptor_, (struct sockaddr *) &remoteaddr, &addrlen); + FD_SET(newfd, &total_list_); // add to master set + if (newfd > socket_max_) { // keep track of the max + socket_max_ = newfd; + } + return newfd; + + } else { + return socket_file_descriptor_; + } + // we have a new connection + } else { + // data to be received on i + return i; + } + } + } + + logger_->log_debug("Could not find a suitable file descriptor or select timed out"); + + return -1; +} + +int16_t Socket::setSocketOptions(const SocketDescriptor sock) { + int opt = 1; +#ifndef WIN32 +#ifndef __MACH__ + if (setsockopt(sock, SOL_TCP, TCP_NODELAY, static_cast<void*>(&opt), sizeof(opt)) < 0) { + logger_->log_error("setsockopt() TCP_NODELAY failed"); + close(sock); + return -1; + } + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) { + logger_->log_error("setsockopt() SO_REUSEADDR failed"); + close(sock); + return -1; + } + + int sndsize = 256 * 1024; + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char *>(&sndsize), sizeof(sndsize)) < 0) { + logger_->log_error("setsockopt() SO_SNDBUF failed"); + close(sock); + return -1; + } + +#else + if (listeners_ > 0) { + // lose the pesky "address already in use" error message + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char *>(&opt), sizeof(opt)) < 0) { + logger_->log_error("setsockopt() SO_REUSEADDR failed"); + close(sock); + return -1; + } + } +#endif +#endif + return 0; +} + +std::string Socket::getHostname() const { + return canonical_hostname_; +} + +int Socket::writeData(std::vector<uint8_t> &buf, int buflen) { + if (static_cast<int>(buf.capacity()) < buflen) + return -1; + return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen); +} + +// data stream overrides + +int Socket::writeData(uint8_t *value, int size) { + int ret = 0, bytes = 0; + + int fd = select_descriptor(1000); + while (bytes < size) { +#ifdef WIN32 + const char *vts = (const char*)value; + ret = send(fd, vts + bytes, size - bytes, 0); +#else + ret = send(fd, value + bytes, size - bytes, 0); +#endif + // check for errors + if (ret <= 0) { + close(fd); + logger_->log_error("Could not send to %d, error: %s", fd, strerror(errno)); + return ret; + } + bytes += ret; + } + + if (ret) + logger_->log_trace("Send data size %d over socket %d", size, fd); + total_written_ += bytes; + return bytes; +} + +template<typename T> +inline std::vector<uint8_t> Socket::readBuffer(const T& t) { + std::vector<uint8_t> buf; + buf.resize(sizeof t); + readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t)); + return buf; +} + +int Socket::write(uint64_t base_value, bool is_little_endian) { + return Serializable::write(base_value, this, is_little_endian); +} + +int Socket::write(uint32_t base_value, bool is_little_endian) { + return Serializable::write(base_value, this, is_little_endian); +} + +int Socket::write(uint16_t base_value, bool is_little_endian) { + return Serializable::write(base_value, this, is_little_endian); +} + +int Socket::read(uint64_t &value, bool is_little_endian) { + auto buf = readBuffer(value); + + if (is_little_endian) { + value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | ((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | ((uint64_t) (buf[4] & 255) << 24) + | ((uint64_t) (buf[5] & 255) << 16) | ((uint64_t) (buf[6] & 255) << 8) | ((uint64_t) (buf[7] & 255) << 0); + } else { + value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) | ((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) | ((uint64_t) (buf[4] & 255) << 32) + | ((uint64_t) (buf[5] & 255) << 40) | ((uint64_t) (buf[6] & 255) << 48) | ((uint64_t) (buf[7] & 255) << 56); + } + return sizeof(value); +} + +int Socket::read(uint32_t &value, bool is_little_endian) { + auto buf = readBuffer(value); + + if (is_little_endian) { + value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; + } else { + value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24; + } + return sizeof(value); +} + +int Socket::read(uint16_t &value, bool is_little_endian) { + auto buf = readBuffer(value); + + if (is_little_endian) { + value = (buf[0] << 8) | buf[1]; + } else { + value = buf[0] | buf[1] << 8; + } + return sizeof(value); +} + +int Socket::readData(std::vector<uint8_t> &buf, int buflen, bool retrieve_all_bytes) { + if (static_cast<int>(buf.capacity()) < buflen) { + buf.resize(buflen); + } + return readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen, retrieve_all_bytes); +} + +int Socket::readData(uint8_t *buf, int buflen, bool retrieve_all_bytes) { + int32_t total_read = 0; + while (buflen) { + int16_t fd = select_descriptor(1000); + if (fd < 0) { + if (listeners_ <= 0) { + logger_->log_debug("fd %d close %i", fd, buflen); + close(socket_file_descriptor_); + } + return -1; + } +#ifdef WIN32 + char *bfs = reinterpret_cast<char*>(buf); + int bytes_read = recv(fd, bfs, buflen, 0); +#else + int bytes_read = recv(fd, buf, buflen, 0); +#endif + logger_->log_trace("Recv call %d", bytes_read); + if (bytes_read <= 0) { + if (bytes_read == 0) { + logger_->log_debug("Other side hung up on %d", fd); + } else { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // continue + return -2; + } + logger_->log_error("Could not recv on %d, error: %s", fd, strerror(errno)); + } + return -1; + } + buflen -= bytes_read; + buf += bytes_read; + total_read += bytes_read; + if (!retrieve_all_bytes) { + break; + } + } + total_read_ += total_read; + return total_read; +} + +} /* namespace io */ +} /* namespace minifi */ +} /* namespace nifi */ +} /* namespace apache */ +} /* namespace org */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/processors/AppendHostInfo.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/AppendHostInfo.cpp b/libminifi/src/processors/AppendHostInfo.cpp index bbc86a5..8769de2 100644 --- a/libminifi/src/processors/AppendHostInfo.cpp +++ b/libminifi/src/processors/AppendHostInfo.cpp @@ -20,14 +20,7 @@ #include "processors/AppendHostInfo.h" #define __USE_POSIX #include <limits.h> -#include <sys/time.h> #include <string.h> -#include <netdb.h> -#include <netinet/in.h> -#include <sys/socket.h> -#include <sys/ioctl.h> -#include <net/if.h> -#include <arpa/inet.h> #include <memory> #include <string> #include <set> @@ -42,6 +35,15 @@ namespace nifi { namespace minifi { namespace processors { +#ifndef WIN32 +#include <netdb.h> +#include <netinet/in.h> +#include <sys/socket.h> +#include <sys/ioctl.h> +#include <net/if.h> +#include <arpa/inet.h> +#endif + #ifndef HOST_NAME_MAX #define HOST_NAME_MAX 255 #endif @@ -80,6 +82,7 @@ void AppendHostInfo::onTrigger(core::ProcessContext *context, core::ProcessSessi std::string iface; context->getProperty(InterfaceName.getName(), iface); // Confirm the specified interface name exists on this device +#ifndef WIN32 if (if_nametoindex(iface.c_str()) != 0) { struct ifreq ifr; int fd = socket(AF_INET, SOCK_DGRAM, 0); @@ -94,6 +97,7 @@ void AppendHostInfo::onTrigger(core::ProcessContext *context, core::ProcessSessi context->getProperty(IPAttribute.getName(), ipAttribute); flow->addAttribute(ipAttribute.c_str(), inet_ntoa(((struct sockaddr_in *) &ifr.ifr_addr)->sin_addr)); } +#endif // Transfer to the relationship session->transfer(flow, Success); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/processors/ExecuteProcess.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ExecuteProcess.cpp b/libminifi/src/processors/ExecuteProcess.cpp index db4399e..6f43e8a 100644 --- a/libminifi/src/processors/ExecuteProcess.cpp +++ b/libminifi/src/processors/ExecuteProcess.cpp @@ -42,14 +42,17 @@ namespace apache { namespace nifi { namespace minifi { namespace processors { - -core::Property ExecuteProcess::Command("Command", "Specifies the command to be executed; if just the name of an executable" - " is provided, it must be in the user's environment PATH.", - ""); -core::Property ExecuteProcess::CommandArguments("Command Arguments", "The arguments to supply to the executable delimited by white space. White " - "space can be escaped by enclosing it in double-quotes.", - ""); -core::Property ExecuteProcess::WorkingDir("Working Directory", "The directory to use as the current working directory when executing the command", ""); +#ifndef WIN32 +core::Property ExecuteProcess::Command( + core::PropertyBuilder::createProperty("Command")->withDescription("Specifies the command to be executed; if just the name of an executable" + " is provided, it must be in the user's environment PATH.")->supportsExpressionLanguage(true)->withDefaultValue("")->build()); +core::Property ExecuteProcess::CommandArguments( + core::PropertyBuilder::createProperty("Command Arguments")->withDescription("The arguments to supply to the executable delimited by white space. White " + "space can be escaped by enclosing it in " + "double-quotes.")->supportsExpressionLanguage(true)->withDefaultValue("")->build()); +core::Property ExecuteProcess::WorkingDir( + core::PropertyBuilder::createProperty("Working Directory")->withDescription("The directory to use as the current working directory when executing the command")->supportsExpressionLanguage(true) + ->withDefaultValue("")->build()); core::Property ExecuteProcess::BatchDuration("Batch Duration", "If the process is expected to be long-running and produce textual output, a " "batch duration can be specified.", "0"); @@ -74,13 +77,13 @@ void ExecuteProcess::initialize() { void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { std::string value; std::shared_ptr<core::FlowFile> flow_file; - if (context->getProperty(Command.getName(), value, flow_file)) { + if (context->getProperty(Command, value, flow_file)) { this->_command = value; } - if (context->getProperty(CommandArguments.getName(), value, flow_file)) { + if (context->getProperty(CommandArguments, value, flow_file)) { this->_commandArgument = value; } - if (context->getProperty(WorkingDir.getName(), value, flow_file)) { + if (context->getProperty(WorkingDir, value, flow_file)) { this->_workingDir = value; } if (context->getProperty(BatchDuration.getName(), value)) { @@ -229,7 +232,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext *context, core::ProcessSessi } } } - +#endif } /* namespace processors */ } /* namespace minifi */ } /* namespace nifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/processors/ExtractText.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/ExtractText.cpp b/libminifi/src/processors/ExtractText.cpp index 2175415..328a476 100644 --- a/libminifi/src/processors/ExtractText.cpp +++ b/libminifi/src/processors/ExtractText.cpp @@ -41,78 +41,78 @@ core::Property ExtractText::SizeLimit("Size Limit", "Maximum number of bytes to core::Relationship ExtractText::Success("success", "success operational on the flow record"); void ExtractText::initialize() { - //! Set the supported properties - std::set<core::Property> properties; - properties.insert(Attribute); - setSupportedProperties(properties); - //! Set the supported relationships - std::set<core::Relationship> relationships; - relationships.insert(Success); - setSupportedRelationships(relationships); + //! Set the supported properties + std::set<core::Property> properties; + properties.insert(Attribute); + setSupportedProperties(properties); + //! Set the supported relationships + std::set<core::Relationship> relationships; + relationships.insert(Success); + setSupportedRelationships(relationships); } void ExtractText::onTrigger(core::ProcessContext *context, core::ProcessSession *session) { - std::shared_ptr<core::FlowFile> flowFile = session->get(); + std::shared_ptr<core::FlowFile> flowFile = session->get(); - if (!flowFile) { - return; - } + if (!flowFile) { + return; + } - ReadCallback cb(flowFile, context); - session->read(flowFile, &cb); - session->transfer(flowFile, Success); + ReadCallback cb(flowFile, context); + session->read(flowFile, &cb); + session->transfer(flowFile, Success); } int64_t ExtractText::ReadCallback::process(std::shared_ptr<io::BaseStream> stream) { - int64_t ret = 0; - uint64_t size_limit = flowFile_->getSize(); - uint64_t read_size = 0; - uint64_t loop_read = max_read_; - - std::string attrKey, sizeLimitStr; - ctx_->getProperty(Attribute.getName(), attrKey); - ctx_->getProperty(SizeLimit.getName(), sizeLimitStr); - - if (sizeLimitStr == "") - size_limit = DEFAULT_SIZE_LIMIT; - else if (sizeLimitStr != "0") - size_limit = std::stoi(sizeLimitStr); - - std::ostringstream contentStream; - std::string contentStr; - - while (read_size < size_limit) { - if (size_limit - read_size < (uint64_t)max_read_) - loop_read = size_limit - read_size; - - ret = stream->readData(buffer_, loop_read); - buffer_.resize(ret); - - if (ret < 0) { - return -1; - } - - if (ret > 0) { - contentStream.write(reinterpret_cast<const char*>(&buffer_[0]), ret); - if (contentStream.fail()) { - return -1; - } - } else { - break; - } + int64_t ret = 0; + uint64_t size_limit = flowFile_->getSize(); + uint64_t read_size = 0; + uint64_t loop_read = max_read_; + + std::string attrKey, sizeLimitStr; + ctx_->getProperty(Attribute.getName(), attrKey); + ctx_->getProperty(SizeLimit.getName(), sizeLimitStr); + + if (sizeLimitStr == "") + size_limit = DEFAULT_SIZE_LIMIT; + else if (sizeLimitStr != "0") + size_limit = std::stoi(sizeLimitStr); + + std::ostringstream contentStream; + std::string contentStr; + + while (read_size < size_limit) { + if (size_limit - read_size < (uint64_t) max_read_) + loop_read = size_limit - read_size; + + ret = stream->readData(buffer_, loop_read); + buffer_.resize(ret); + + if (ret < 0) { + return -1; + } + + if (ret > 0) { + contentStream.write(reinterpret_cast<const char*>(&buffer_[0]), ret); + if (contentStream.fail()) { + return -1; + } + } else { + break; } + } - contentStr = contentStream.str(); - flowFile_->setAttribute(attrKey, contentStr); - return read_size; + contentStr = contentStream.str(); + flowFile_->setAttribute(attrKey, contentStr); + return read_size; } ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> flowFile, core::ProcessContext *ctx) - : max_read_(getpagesize()), + : max_read_(4096), flowFile_(flowFile), ctx_(ctx) { - buffer_.resize(max_read_); - } + buffer_.resize(max_read_); +} } /* namespace processors */ } /* namespace minifi */ http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/processors/GenerateFlowFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GenerateFlowFile.cpp b/libminifi/src/processors/GenerateFlowFile.cpp index 82abe82..223346f 100644 --- a/libminifi/src/processors/GenerateFlowFile.cpp +++ b/libminifi/src/processors/GenerateFlowFile.cpp @@ -18,7 +18,6 @@ * limitations under the License. */ #include "processors/GenerateFlowFile.h" -#include <sys/time.h> #include <time.h> #include <vector> #include <queue> @@ -29,6 +28,10 @@ #include <chrono> #include <thread> #include <random> +#ifdef WIN32 +#define srandom srand +#define random rand +#endif #include "utils/StringUtils.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" @@ -46,7 +49,7 @@ core::Property GenerateFlowFile::DataFormat("Data Format", "Specifies whether th core::Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles", "If true, each FlowFile that is generated will be unique. If false, a random value will be generated and all FlowFiles", "true"); core::Relationship GenerateFlowFile::Success("success", "success operational on the flow record"); const unsigned int TEXT_LEN = 90; -static const char TEXT_CHARS[TEXT_LEN+1] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()-_=+/?.,';:\"?<>\n\t "; +static const char TEXT_CHARS[TEXT_LEN + 1] = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()-_=+/?.,';:\"?<>\n\t "; void GenerateFlowFile::initialize() { // Set the supported properties http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/processors/GetFile.cpp ---------------------------------------------------------------------- diff --git a/libminifi/src/processors/GetFile.cpp b/libminifi/src/processors/GetFile.cpp index 4261d1a..89e50fc 100644 --- a/libminifi/src/processors/GetFile.cpp +++ b/libminifi/src/processors/GetFile.cpp @@ -16,15 +16,16 @@ * limitations under the License. */ #include "processors/GetFile.h" -#include <sys/time.h> #include <sys/types.h> #include <sys/stat.h> #include <time.h> #include <stdio.h> -#include <dirent.h> #include <limits.h> -#include <unistd.h> +#ifndef WIN32 #include <regex.h> +#else +#include <regex> +#endif #include <vector> #include <queue> #include <map> @@ -34,10 +35,18 @@ #include <string> #include <iostream> #include "utils/StringUtils.h" +#include "utils/file/FileUtils.h" #include "utils/TimeUtil.h" #include "core/ProcessContext.h" #include "core/ProcessSession.h" +#ifndef S_ISDIR +#define S_ISDIR(mode) (((mode) & S_IFMT) == S_IFDIR) +#endif +#define R_OK 4 /* Test for read permission. */ +#define W_OK 2 /* Test for write permission. */ +#define F_OK 0 /* Test for existence. */ + namespace org { namespace apache { namespace nifi { @@ -45,7 +54,9 @@ namespace minifi { namespace processors { core::Property GetFile::BatchSize("Batch Size", "The maximum number of files to pull in each iteration", "10"); -core::Property GetFile::Directory("Input Directory", "The input directory from which to pull files", ".", true, "", {}, {}); +core::Property GetFile::Directory( + core::PropertyBuilder::createProperty("Input Directory")->withDescription("The input directory from which to pull files")->isRequired(true)->supportsExpressionLanguage(true)->withDefaultValue(".") + ->build()); core::Property GetFile::IgnoreHiddenFile("Ignore Hidden Files", "Indicates whether or not hidden files should be ignored", "true"); core::Property GetFile::KeepSourceFile("Keep Source File", "If true, the file is not deleted after it has been copied to the Content Repository", "false"); core::Property GetFile::MaxAge("Maximum File Age", "The minimum age that a file must be in order to be pulled;" @@ -137,7 +148,7 @@ void GetFile::onTrigger(core::ProcessContext *context, core::ProcessSession *ses if (request_.pollInterval == 0 || (getTimeMillis() - last_listing_time_) > request_.pollInterval) { std::string directory; const std::shared_ptr<core::FlowFile> flow_file; - if (!context->getProperty(Directory.getName(), directory, flow_file)) { + if (!context->getProperty(Directory, directory, flow_file)) { logger_->log_warn("Resolved missing Input Directory property value"); } performListing(directory, request_); @@ -203,10 +214,10 @@ bool GetFile::acceptFile(std::string fullName, std::string name, const GetFileRe struct stat statbuf; if (stat(fullName.c_str(), &statbuf) == 0) { - if (request.minSize > 0 && statbuf.st_size < (int32_t)request.minSize) + if (request.minSize > 0 && statbuf.st_size < (int32_t) request.minSize) return false; - if (request.maxSize > 0 && statbuf.st_size > (int32_t)request.maxSize) + if (request.maxSize > 0 && statbuf.st_size > (int32_t) request.maxSize) return false; uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000); @@ -224,7 +235,7 @@ bool GetFile::acceptFile(std::string fullName, std::string name, const GetFileRe if (request.keepSourceFile == false && access(fullName.c_str(), W_OK) != 0) return false; - +#ifndef WIN32 regex_t regex; int ret = regcomp(®ex, request.fileFilter.c_str(), 0); if (ret) @@ -233,6 +244,12 @@ bool GetFile::acceptFile(std::string fullName, std::string name, const GetFileRe regfree(®ex); if (ret) return false; +#else + std::regex regex(request.fileFilter); + if (!std::regex_match(name, regex)) { + return false; + } +#endif metrics_->input_bytes_ += statbuf.st_size; metrics_->accepted_files_++; return true; @@ -242,6 +259,7 @@ bool GetFile::acceptFile(std::string fullName, std::string name, const GetFileRe } void GetFile::performListing(std::string dir, const GetFileRequest &request) { +#ifndef WIN32 DIR *d; d = opendir(dir.c_str()); if (!d) @@ -255,7 +273,7 @@ void GetFile::performListing(std::string dir, const GetFileRequest &request) { break; std::string d_name = entry->d_name; std::string path = dir + "/" + d_name; - struct stat statbuf{}; + struct stat statbuf { }; if (stat(path.c_str(), &statbuf) != 0) { logger_->log_warn("Failed to stat %s", path); break; @@ -273,6 +291,33 @@ void GetFile::performListing(std::string dir, const GetFileRequest &request) { } } closedir(d); +#else + HANDLE hFind; + WIN32_FIND_DATA FindFileData; + + if ((hFind = FindFirstFile(dir.c_str(), &FindFileData)) != INVALID_HANDLE_VALUE) { + do { + struct stat statbuf {}; + if (stat(FindFileData.cFileName, &statbuf) != 0) { + logger_->log_warn("Failed to stat %s", FindFileData.cFileName); + break; + } + + std::string path = dir + "/" + FindFileData.cFileName; + if (S_ISDIR(statbuf.st_mode)) { + if (request.recursive && strcmp(FindFileData.cFileName, "..") != 0 && strcmp(FindFileData.cFileName, ".") != 0) { + performListing(path, request); + } + } else { + if (acceptFile(path, FindFileData.cFileName, request)) { + // check whether we can take this file + putListing(path); + } + } + }while (FindNextFile(hFind, &FindFileData)); + FindClose(hFind); + } +#endif } int16_t GetFile::getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector) {