http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/ProcessorNode.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/ProcessorNode.cpp 
b/libminifi/src/core/ProcessorNode.cpp
index 0a21f4d..81f8c9b 100644
--- a/libminifi/src/core/ProcessorNode.cpp
+++ b/libminifi/src/core/ProcessorNode.cpp
@@ -26,17 +26,17 @@ namespace core {
 
 ProcessorNode::ProcessorNode(const std::shared_ptr<Connectable> &processor)
     : processor_(processor),
-      Connectable(processor->getName(), 0),
+      Connectable(processor->getName()),
       ConfigurableComponent() {
-  uuid_t copy;
+  utils::Identifier copy;
   processor->getUUID(copy);
   setUUID(copy);
 }
 
 ProcessorNode::ProcessorNode(const ProcessorNode &other)
     : processor_(other.processor_),
-      Connectable(other.getName(), 0) {
-  uuid_t copy;
+      Connectable(other.getName()) {
+  utils::Identifier copy;
   processor_->getUUID(copy);
   setUUID(copy);
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/Property.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Property.cpp b/libminifi/src/core/Property.cpp
index bb84e44..f3205d6 100644
--- a/libminifi/src/core/Property.cpp
+++ b/libminifi/src/core/Property.cpp
@@ -50,6 +50,10 @@ bool Property::getRequired() const {
   return is_required_;
 }
 
+bool Property::supportsExpressionLangauge() const {
+  return supports_el_;
+}
+
 std::string Property::getValidRegex() const {
   return valid_regex_;
 }
@@ -67,6 +71,10 @@ void Property::setValue(std::string value) {
   }
 }
 
+void Property::setSupportsExpressionLanguage(bool supportEl) {
+  supports_el_ = supportEl;
+}
+
 void Property::addValue(std::string value) {
   values_.push_back(std::move(value));
 }
@@ -86,6 +94,7 @@ const Property &Property::operator=(const Property &other) {
   valid_regex_ = other.valid_regex_;
   dependent_properties_ = other.dependent_properties_;
   exclusive_of_properties_ = other.exclusive_of_properties_;
+  supports_el_ = other.supports_el_;
   return *this;
 }
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/Repository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/Repository.cpp 
b/libminifi/src/core/Repository.cpp
index 1432d9e..8c5fa43 100644
--- a/libminifi/src/core/Repository.cpp
+++ b/libminifi/src/core/Repository.cpp
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 #include "core/Repository.h"
-#include <arpa/inet.h>
 #include <cstdint>
 #include <vector>
 

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/logging/LoggerConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp 
b/libminifi/src/core/logging/LoggerConfiguration.cpp
index a0a207a..ef5be53 100644
--- a/libminifi/src/core/logging/LoggerConfiguration.cpp
+++ b/libminifi/src/core/logging/LoggerConfiguration.cpp
@@ -20,7 +20,6 @@
 
 #include "core/logging/LoggerConfiguration.h"
 #include <sys/stat.h>
-#include <unistd.h>
 #include <algorithm>
 #include <vector>
 #include <queue>
@@ -34,6 +33,13 @@
 #include "spdlog/spdlog.h"
 #include "spdlog/sinks/stdout_sinks.h"
 #include "spdlog/sinks/null_sink.h"
+#ifdef WIN32
+#define stat _stat
+#include <direct.h>
+#define _WINSOCKAPI_
+#include <windows.h>
+#include <tchar.h>
+#endif
 
 namespace org {
 namespace apache {
@@ -120,10 +126,17 @@ std::shared_ptr<internal::LoggerNamespace> 
LoggerConfiguration::initialize_names
         // Create the log directory if needed
         directory += "/logs";
         struct stat logDirStat;
+#ifdef WIN32
+        if (stat(directory.c_str(), &logDirStat) != 0) {
+          if (_mkdir(directory.c_str()) == -1) {
+            exit(1);
+          }
+#else
         if (stat(directory.c_str(), &logDirStat) != 0 || 
!S_ISDIR(logDirStat.st_mode)) {
           if (mkdir(directory.c_str(), 0777) == -1) {
             exit(1);
           }
+#endif
         }
         file_name = directory + "/" + file_name;
       }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/repository/VolatileContentRepository.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/repository/VolatileContentRepository.cpp 
b/libminifi/src/core/repository/VolatileContentRepository.cpp
index 0e99311..47c7ba6 100644
--- a/libminifi/src/core/repository/VolatileContentRepository.cpp
+++ b/libminifi/src/core/repository/VolatileContentRepository.cpp
@@ -17,6 +17,7 @@
  */
 
 #include "core/repository/VolatileContentRepository.h"
+#include "capi/expect.h"
 #include <cstdio>
 #include <string>
 #include <memory>
@@ -102,7 +103,7 @@ std::shared_ptr<io::BaseStream> 
VolatileContentRepository::write(const std::shar
   }
 
   int size = 0;
-  if (__builtin_expect(minimize_locking_ == true, 1)) {
+  if (LIKELY(minimize_locking_ == true)) {
     for (auto ent : value_vector_) {
       if (ent->testAndSetKey(claim, nullptr, nullptr, 
resource_claim_comparator_)) {
         std::lock_guard<std::mutex> lock(map_mutex_);
@@ -113,15 +114,15 @@ std::shared_ptr<io::BaseStream> 
VolatileContentRepository::write(const std::shar
       size++;
     }
   } else {
-    std::lock_guard < std::mutex > lock(map_mutex_);
+    std::lock_guard<std::mutex> lock(map_mutex_);
     auto claim_check = master_list_.find(claim->getContentFullPath());
     if (claim_check != master_list_.end()) {
-      return std::make_shared < 
io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, 
claim_check->second);
+      return 
std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim,
 claim_check->second);
     } else {
       AtomicEntry<std::shared_ptr<minifi::ResourceClaim>> *ent = new 
AtomicEntry<std::shared_ptr<minifi::ResourceClaim>>(&current_size_, &max_size_);
       if (ent->testAndSetKey(claim, nullptr, nullptr, 
resource_claim_comparator_)) {
         master_list_[claim->getContentFullPath()] = ent;
-        return std::make_shared< 
io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim, ent);
+        return 
std::make_shared<io::AtomicEntryStream<std::shared_ptr<minifi::ResourceClaim>>>(claim,
 ent);
       }
     }
   }
@@ -158,7 +159,7 @@ std::shared_ptr<io::BaseStream> 
VolatileContentRepository::read(const std::share
 }
 
 bool VolatileContentRepository::remove(const 
std::shared_ptr<minifi::ResourceClaim> &claim) {
-  if (__builtin_expect(minimize_locking_ == true, 1)) {
+  if (LIKELY(minimize_locking_ == true)) {
     std::lock_guard<std::mutex> lock(map_mutex_);
     auto ent = master_list_.find(claim->getContentFullPath());
     if (ent != master_list_.end()) {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/core/yaml/YamlConfiguration.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/core/yaml/YamlConfiguration.cpp 
b/libminifi/src/core/yaml/YamlConfiguration.cpp
index ef3a81f..af88082 100644
--- a/libminifi/src/core/yaml/YamlConfiguration.cpp
+++ b/libminifi/src/core/yaml/YamlConfiguration.cpp
@@ -35,7 +35,7 @@ namespace core {
 std::shared_ptr<utils::IdGenerator> YamlConfiguration::id_generator_ = 
utils::IdGenerator::getIdGenerator();
 
 core::ProcessGroup *YamlConfiguration::parseRootProcessGroupYaml(YAML::Node 
rootFlowNode) {
-  uuid_t uuid;
+  utils::Identifier uuid;
   int version = 0;
 
   checkRequiredField(&rootFlowNode, "name",
@@ -50,7 +50,7 @@ core::ProcessGroup 
*YamlConfiguration::parseRootProcessGroupYaml(YAML::Node root
   }
 
   std::string id = getOrGenerateId(&rootFlowNode);
-  uuid_parse(id.c_str(), uuid);
+  uuid = id;
 
   if (rootFlowNode["version"]) {
     version = rootFlowNode["version"].as<int>();
@@ -69,7 +69,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node 
processorsNode, core::
   int64_t penalizationPeriod = -1;
   int64_t yieldPeriod = -1;
   int64_t runDurationNanos = -1;
-  uuid_t uuid;
+  utils::Identifier uuid;
   std::shared_ptr<core::Processor> processor = nullptr;
 
   if (!parentGroup) {
@@ -97,7 +97,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node 
processorsNode, core::
           registerResource(lib_location_str, lib_function_str);
         }
 
-        uuid_parse(procCfg.id.c_str(), uuid);
+        uuid = procCfg.id.c_str();
         logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", 
procCfg.name, procCfg.id);
         checkRequiredField(&procNode, "class", CONFIG_YAML_PROCESSORS_KEY);
         procCfg.javaClass = procNode["class"].as<std::string>();
@@ -130,6 +130,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node 
processorsNode, core::
 
         auto periodNode = getOptionalField(&procNode, "scheduling period", 
YAML::Node(DEFAULT_SCHEDULING_PERIOD_STR),
         CONFIG_YAML_PROCESSORS_KEY);
+
         procCfg.schedulingPeriod = periodNode.as<std::string>();
         logger_->log_debug("parseProcessorNode: scheduling period => [%s]", 
procCfg.schedulingPeriod);
 
@@ -235,7 +236,7 @@ void YamlConfiguration::parseProcessorNodeYaml(YAML::Node 
processorsNode, core::
 }
 
 void YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, 
core::ProcessGroup *parentGroup) {
-  uuid_t uuid;
+  utils::Identifier uuid;
   std::string id;
 
   if (!parentGroup) {
@@ -255,16 +256,17 @@ void 
YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
 
         logger_->log_debug("parseRemoteProcessGroupYaml: name => [%s], id => 
[%s]", name, id);
 
-        checkRequiredField(&currRpgNode, "url",
+        auto urlNode = getOptionalField(&currRpgNode, "url", YAML::Node(""),
         CONFIG_YAML_REMOTE_PROCESS_GROUP_KEY);
-        std::string url = currRpgNode["url"].as<std::string>();
+
+        std::string url = urlNode.as<std::string>();
         logger_->log_debug("parseRemoteProcessGroupYaml: url => [%s]", url);
 
         core::ProcessGroup *group = NULL;
         core::TimeUnit unit;
         int64_t timeoutValue = -1;
         int64_t yieldPeriodValue = -1;
-        uuid_parse(id.c_str(), uuid);
+        uuid = id;
         group = this->createRemoteProcessGroup(name.c_str(), uuid).release();
         group->setParent(parentGroup);
         parentGroup->addProcessGroup(group);
@@ -355,7 +357,7 @@ void 
YamlConfiguration::parseRemoteProcessGroupYaml(YAML::Node *rpgNode, core::P
 }
 
 void YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, 
core::ProcessGroup *parentGroup) {
-  uuid_t port_uuid;
+  utils::Identifier port_uuid;
   int64_t schedulingPeriod = -1;
 
   if (!parentGroup) {
@@ -419,7 +421,7 @@ void 
YamlConfiguration::parseProvenanceReportingYaml(YAML::Node *reportNode, cor
   auto batchSizeStr = node["batch size"].as<std::string>();
 
   logger_->log_debug("ProvenanceReportingTask port uuid %s", portUUIDStr);
-  uuid_parse(portUUIDStr.c_str(), port_uuid);
+  port_uuid = portUUIDStr;
   reportTask->setPortUUID(port_uuid);
 
   if (core::Property::StringToInt(batchSizeStr, lvalue)) {
@@ -450,8 +452,8 @@ void YamlConfiguration::parseControllerServices(YAML::Node 
*controllerServicesNo
           auto id = controllerServiceNode["id"].as<std::string>();
           auto type = controllerServiceNode["class"].as<std::string>();
 
-          uuid_t uuid;
-          uuid_parse(id.c_str(), uuid);
+          utils::Identifier uuid;
+          uuid = id;
           auto controller_service_node = createControllerService(type, name, 
uuid);
           if (nullptr != controller_service_node) {
             logger_->log_debug("Created Controller Service with UUID %s and 
name %s", id, name);
@@ -488,7 +490,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node 
*connectionsNode, core::P
         std::shared_ptr<minifi::Connection> connection = nullptr;
 
         // Configure basic connection
-        uuid_t uuid;
+        utils::Identifier uuid;
         std::string id = getOrGenerateId(&connectionNode);
 
         // Default name to be same as ID
@@ -499,7 +501,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node 
*connectionsNode, core::P
           name = connectionNode["name"].as<std::string>();
         }
 
-        uuid_parse(id.c_str(), uuid);
+        uuid = id;
         connection = this->createConnection(name, uuid);
         logger_->log_debug("Created connection with UUID %s and name %s", id, 
name);
 
@@ -526,7 +528,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node 
*connectionsNode, core::P
           }
         }
 
-        uuid_t srcUUID;
+        utils::Identifier srcUUID;
 
         if (connectionNode["max work queue size"]) {
           auto max_work_queue_str = connectionNode["max work queue 
size"].as<std::string>();
@@ -548,7 +550,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node 
*connectionsNode, core::P
 
         if (connectionNode["source id"]) {
           std::string connectionSrcProcId = connectionNode["source 
id"].as<std::string>();
-          uuid_parse(connectionSrcProcId.c_str(), srcUUID);
+          srcUUID = connectionSrcProcId;
           logger_->log_debug("Using 'source id' to match source with same id 
for "
                              "connection '%s': source id => [%s]",
                              name, connectionSrcProcId);
@@ -557,10 +559,11 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node 
*connectionsNode, core::P
           checkRequiredField(&connectionNode, "source name",
           CONFIG_YAML_CONNECTIONS_KEY);
           std::string connectionSrcProcName = connectionNode["source 
name"].as<std::string>();
-          uuid_t tmpUUID;
-          if (!uuid_parse(connectionSrcProcName.c_str(), tmpUUID) && NULL != 
parent->findProcessor(tmpUUID)) {
+          utils::Identifier tmpUUID;
+          tmpUUID = connectionSrcProcName;
+          if (NULL != parent->findProcessor(tmpUUID)) {
             // the source name is a remote port id, so use that as the source 
id
-            uuid_copy(srcUUID, tmpUUID);
+            srcUUID = tmpUUID;
             logger_->log_debug("Using 'source name' containing a remote port 
id to match the source for "
                                "connection '%s': source name => [%s]",
                                name, connectionSrcProcName);
@@ -582,10 +585,10 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node 
*connectionsNode, core::P
         connection->setSourceUUID(srcUUID);
 
         // Configure connection destination
-        uuid_t destUUID;
+        utils::Identifier destUUID;
         if (connectionNode["destination id"]) {
           std::string connectionDestProcId = connectionNode["destination 
id"].as<std::string>();
-          uuid_parse(connectionDestProcId.c_str(), destUUID);
+          destUUID = connectionDestProcId;
           logger_->log_debug("Using 'destination id' to match destination with 
same id for "
                              "connection '%s': destination id => [%s]",
                              name, connectionDestProcId);
@@ -595,11 +598,11 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node 
*connectionsNode, core::P
           checkRequiredField(&connectionNode, "destination name",
           CONFIG_YAML_CONNECTIONS_KEY);
           std::string connectionDestProcName = connectionNode["destination 
name"].as<std::string>();
-          uuid_t tmpUUID;
-          if (!uuid_parse(connectionDestProcName.c_str(), tmpUUID) &&
-          NULL != parent->findProcessor(tmpUUID)) {
+          utils::Identifier tmpUUID;
+          tmpUUID = connectionDestProcName;
+          if (parent->findProcessor(tmpUUID)) {
             // the destination name is a remote port id, so use that as the 
dest id
-            uuid_copy(destUUID, tmpUUID);
+            destUUID = tmpUUID;
             logger_->log_debug("Using 'destination name' containing a remote 
port id to match the destination for "
                                "connection '%s': destination name => [%s]",
                                name, connectionDestProcName);
@@ -629,7 +632,7 @@ void YamlConfiguration::parseConnectionYaml(YAML::Node 
*connectionsNode, core::P
 }
 
 void YamlConfiguration::parsePortYaml(YAML::Node *portNode, core::ProcessGroup 
*parent, sitetosite::TransferDirection direction) {
-  uuid_t uuid;
+  utils::Identifier uuid;
   std::shared_ptr<core::Processor> processor = NULL;
   std::shared_ptr<minifi::RemoteProcessorGroupPort> port = NULL;
 
@@ -652,7 +655,7 @@ void YamlConfiguration::parsePortYaml(YAML::Node *portNode, 
core::ProcessGroup *
                          "id should match the corresponding id specified in 
the NiFi configuration. "
                          "This is a UUID of the format 
XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.");
   auto portId = inputPortsObj["id"].as<std::string>();
-  uuid_parse(portId.c_str(), uuid);
+  uuid = portId;
 
   port = std::make_shared<minifi::RemoteProcessorGroupPort>(stream_factory_, 
nameStr, parent->getURL(), this->configuration_, uuid);
 
@@ -845,11 +848,9 @@ std::string YamlConfiguration::getOrGenerateId(YAML::Node 
*yamlNode, const std::
                                   "of YAML::NodeType::Scalar.");
     }
   } else {
-    uuid_t uuid;
+    utils::Identifier uuid;
     id_generator_->generate(uuid);
-    char uuid_str[37];
-    uuid_unparse(uuid, uuid_str);
-    id = uuid_str;
+    id = uuid.to_string();
     logger_->log_debug("Generating random ID: id => [%s]", id);
   }
   return id;

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/ClientSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ClientSocket.cpp 
b/libminifi/src/io/ClientSocket.cpp
deleted file mode 100644
index 113f914..0000000
--- a/libminifi/src/io/ClientSocket.cpp
+++ /dev/null
@@ -1,484 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "io/ClientSocket.h"
-#include <netinet/tcp.h>
-#include <sys/types.h>
-#include <netdb.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <arpa/inet.h>
-#include <sys/ioctl.h>
-#include <net/if.h>
-#include <ifaddrs.h>
-#include <unistd.h>
-#include <cstdio>
-#include <memory>
-#include <utility>
-#include <vector>
-#include <cerrno>
-#include <iostream>
-#include <string>
-#include "io/validation.h"
-#include "core/logging/LoggerConfiguration.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace io {
-
-Socket::Socket(const std::shared_ptr<SocketContext> &context, const 
std::string &hostname, const uint16_t port, const uint16_t listeners = -1)
-    : requested_hostname_(hostname),
-      port_(port),
-      addr_info_(0),
-      socket_file_descriptor_(-1),
-      socket_max_(0),
-      total_written_(0),
-      total_read_(0),
-      is_loopback_only_(false),
-      listeners_(listeners),
-      canonical_hostname_(""),
-      nonBlocking_(false),
-      logger_(logging::LoggerFactory<Socket>::getLogger()) {
-  FD_ZERO(&total_list_);
-  FD_ZERO(&read_fds_);
-}
-
-Socket::Socket(const std::shared_ptr<SocketContext> &context, const 
std::string &hostname, const uint16_t port)
-    : Socket(context, hostname, port, 0) {
-}
-
-Socket::Socket(const Socket &&other)
-    : requested_hostname_(std::move(other.requested_hostname_)),
-      port_(std::move(other.port_)),
-      is_loopback_only_(false),
-      addr_info_(std::move(other.addr_info_)),
-      socket_file_descriptor_(other.socket_file_descriptor_),
-      socket_max_(other.socket_max_.load()),
-      listeners_(other.listeners_),
-      total_list_(other.total_list_),
-      read_fds_(other.read_fds_),
-      canonical_hostname_(std::move(other.canonical_hostname_)),
-      nonBlocking_(false),
-      logger_(std::move(other.logger_)) {
-  total_written_ = other.total_written_.load();
-  total_read_ = other.total_read_.load();
-}
-
-Socket::~Socket() {
-  closeStream();
-}
-
-void Socket::closeStream() {
-  if (0 != addr_info_) {
-    freeaddrinfo(addr_info_);
-    addr_info_ = 0;
-  }
-  if (socket_file_descriptor_ >= 0) {
-    logging::LOG_DEBUG(logger_) << "Closing " << socket_file_descriptor_;
-    close(socket_file_descriptor_);
-    socket_file_descriptor_ = -1;
-  }
-  if (total_written_ > 0) {
-    local_network_interface_.log_write(total_written_);
-    total_written_ = 0;
-  }
-  if (total_read_ > 0) {
-    local_network_interface_.log_read(total_read_);
-    total_read_ = 0;
-  }
-}
-
-void Socket::setNonBlocking() {
-  if (listeners_ <= 0) {
-    nonBlocking_ = true;
-  }
-}
-
-int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
-  if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype, 
p->ai_protocol)) == -1) {
-    logger_->log_error("error while connecting to server socket");
-    return -1;
-  }
-
-  setSocketOptions(socket_file_descriptor_);
-
-  if (listeners_ <= 0 && !local_network_interface_.getInterface().empty()) {
-    // bind to local network interface
-    ifaddrs* list = NULL;
-    ifaddrs* item = NULL;
-    ifaddrs* itemFound = NULL;
-    int result = getifaddrs(&list);
-    if (result == 0) {
-      item = list;
-      while (item) {
-        if ((item->ifa_addr != NULL) && (item->ifa_name != NULL) && (AF_INET 
== item->ifa_addr->sa_family)) {
-          if (strcmp(item->ifa_name, 
local_network_interface_.getInterface().c_str()) == 0) {
-            itemFound = item;
-            break;
-          }
-        }
-        item = item->ifa_next;
-      }
-
-      if (itemFound != NULL) {
-        result = bind(socket_file_descriptor_, itemFound->ifa_addr, 
sizeof(struct sockaddr_in));
-        if (result < 0)
-          logger_->log_info("Bind to interface %s failed %s", 
local_network_interface_.getInterface(), strerror(errno));
-        else
-          logger_->log_info("Bind to interface %s", 
local_network_interface_.getInterface());
-      }
-      freeifaddrs(list);
-    }
-  }
-
-  if (listeners_ > 0) {
-    struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
-    sa_loc->sin_family = AF_INET;
-    sa_loc->sin_port = htons(port_);
-    if (is_loopback_only_) {
-      sa_loc->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
-    } else {
-      sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
-    }
-    if (bind(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) {
-      logger_->log_error("Could not bind to socket, reason %s", 
strerror(errno));
-      return -1;
-    }
-  }
-  {
-    if (listeners_ <= 0) {
-      struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
-      sa_loc->sin_family = AF_INET;
-      sa_loc->sin_port = htons(port_);
-      // use any address if you are connecting to the local machine for testing
-      // otherwise we must use the requested hostname
-      if (IsNullOrEmpty(requested_hostname_) || requested_hostname_ == 
"localhost") {
-        if (is_loopback_only_) {
-          sa_loc->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
-        } else {
-          sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
-        }
-      } else {
-        sa_loc->sin_addr.s_addr = addr;
-      }
-      if (connect(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) {
-        close(socket_file_descriptor_);
-        socket_file_descriptor_ = -1;
-        return -1;
-      }
-    }
-  }
-
-  // listen
-  if (listeners_ > 0) {
-    if (listen(socket_file_descriptor_, listeners_) == -1) {
-      return -1;
-    } else {
-      logger_->log_debug("Created connection with %d listeners", listeners_);
-    }
-  }
-  // add the listener to the total set
-  FD_SET(socket_file_descriptor_, &total_list_);
-  socket_max_ = socket_file_descriptor_;
-  logger_->log_debug("Created connection with file descriptor %d", 
socket_file_descriptor_);
-  return 0;
-}
-
-int16_t Socket::initialize() {
-  addrinfo hints = { sizeof(addrinfo) };
-  memset(&hints, 0, sizeof hints);  // make sure the struct is empty
-  hints.ai_family = AF_UNSPEC;
-  hints.ai_socktype = SOCK_STREAM;
-  hints.ai_flags = AI_CANONNAME;
-  if (listeners_ > 0)
-    hints.ai_flags |= AI_PASSIVE;
-  hints.ai_protocol = 0; /* any protocol */
-
-  int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints, 
&addr_info_);
-
-  if (errcode != 0) {
-    logger_->log_error("Saw error during getaddrinfo, error: %s", 
strerror(errno));
-    return -1;
-  }
-
-  socket_file_descriptor_ = -1;
-
-  in_addr_t addr;
-  struct hostent *h;
-#ifdef __MACH__
-  h = gethostbyname(requested_hostname_.c_str());
-#else
-  const char *host;
-
-  host = requested_hostname_.c_str();
-  char buf[1024];
-  struct hostent he;
-  int hh_errno;
-  gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
-#endif
-  if (h == nullptr) {
-    return -1;
-  }
-  memcpy(reinterpret_cast<char*>(&addr), h->h_addr_list[0], h->h_length);
-
-  auto p = addr_info_;
-  for (; p != NULL; p = p->ai_next) {
-    if (IsNullOrEmpty(canonical_hostname_)) {
-      if (!IsNullOrEmpty(p) && !IsNullOrEmpty(p->ai_canonname))
-        canonical_hostname_ = p->ai_canonname;
-    }
-    // we've successfully connected
-    if (port_ > 0 && createConnection(p, addr) >= 0) {
-      // Put the socket in non-blocking mode:
-      if (nonBlocking_) {
-        if (fcntl(socket_file_descriptor_, F_SETFL, O_NONBLOCK) < 0) {
-          // handle error
-          logger_->log_error("Could not create non blocking to socket", 
strerror(errno));
-        } else {
-          logger_->log_debug("Successfully applied O_NONBLOCK to fd");
-        }
-      }
-      logger_->log_debug("Successfully created connection");
-      return 0;
-      break;
-    }
-  }
-
-  logger_->log_debug("Could not find device for our connection");
-  return -1;
-}
-
-int16_t Socket::select_descriptor(const uint16_t msec) {
-  if (listeners_ == 0) {
-    return socket_file_descriptor_;
-  }
-
-  struct timeval tv;
-
-  read_fds_ = total_list_;
-
-  tv.tv_sec = msec / 1000;
-  tv.tv_usec = (msec % 1000) * 1000;
-
-  std::lock_guard<std::recursive_mutex> guard(selection_mutex_);
-
-  if (msec > 0)
-    select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv);
-  else
-    select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL);
-
-  for (int i = 0; i <= socket_max_; i++) {
-    if (FD_ISSET(i, &read_fds_)) {
-      if (i == socket_file_descriptor_) {
-        if (listeners_ > 0) {
-          struct sockaddr_storage remoteaddr;  // client address
-          socklen_t addrlen = sizeof remoteaddr;
-          int newfd = accept(socket_file_descriptor_, (struct sockaddr *) 
&remoteaddr, &addrlen);
-          FD_SET(newfd, &total_list_);  // add to master set
-          if (newfd > socket_max_) {    // keep track of the max
-            socket_max_ = newfd;
-          }
-          return newfd;
-
-        } else {
-          return socket_file_descriptor_;
-        }
-        // we have a new connection
-      } else {
-        // data to be received on i
-        return i;
-      }
-    }
-  }
-
-  logger_->log_debug("Could not find a suitable file descriptor or select 
timed out");
-
-  return -1;
-}
-
-int16_t Socket::setSocketOptions(const int sock) {
-  int opt = 1;
-#ifndef __MACH__
-  if (setsockopt(sock, SOL_TCP, TCP_NODELAY, static_cast<void*>(&opt), 
sizeof(opt)) < 0) {
-    logger_->log_error("setsockopt() TCP_NODELAY failed");
-    close(sock);
-    return -1;
-  }
-  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 
reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) {
-    logger_->log_error("setsockopt() SO_REUSEADDR failed");
-    close(sock);
-    return -1;
-  }
-
-  int sndsize = 256 * 1024;
-  if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char 
*>(&sndsize), sizeof(sndsize)) < 0) {
-    logger_->log_error("setsockopt() SO_SNDBUF failed");
-    close(sock);
-    return -1;
-  }
-
-#else
-  if (listeners_ > 0) {
-    // lose the pesky "address already in use" error message
-    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char 
*>(&opt), sizeof(opt)) < 0) {
-      logger_->log_error("setsockopt() SO_REUSEADDR failed");
-      close(sock);
-      return -1;
-    }
-  }
-#endif
-  return 0;
-}
-
-std::string Socket::getHostname() const {
-  return canonical_hostname_;
-}
-
-int Socket::writeData(std::vector<uint8_t> &buf, int buflen) {
-  if (static_cast<int>(buf.capacity()) < buflen)
-    return -1;
-  return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen);
-}
-
-// data stream overrides
-
-int Socket::writeData(uint8_t *value, int size) {
-  int ret = 0, bytes = 0;
-
-  int fd = select_descriptor(1000);
-  while (bytes < size) {
-    ret = send(fd, value + bytes, size - bytes, 0);
-    // check for errors
-    if (ret <= 0) {
-      close(fd);
-      logger_->log_error("Could not send to %d, error: %s", fd, 
strerror(errno));
-      return ret;
-    }
-    bytes += ret;
-  }
-
-  if (ret)
-    logger_->log_trace("Send data size %d over socket %d", size, fd);
-  total_written_ += bytes;
-  return bytes;
-}
-
-template<typename T>
-inline std::vector<uint8_t> Socket::readBuffer(const T& t) {
-  std::vector<uint8_t> buf;
-  buf.resize(sizeof t);
-  readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
-  return buf;
-}
-
-int Socket::write(uint64_t base_value, bool is_little_endian) {
-  return Serializable::write(base_value, this, is_little_endian);
-}
-
-int Socket::write(uint32_t base_value, bool is_little_endian) {
-  return Serializable::write(base_value, this, is_little_endian);
-}
-
-int Socket::write(uint16_t base_value, bool is_little_endian) {
-  return Serializable::write(base_value, this, is_little_endian);
-}
-
-int Socket::read(uint64_t &value, bool is_little_endian) {
-  auto buf = readBuffer(value);
-
-  if (is_little_endian) {
-    value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | 
((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | 
((uint64_t) (buf[4] & 255) << 24)
-        | ((uint64_t) (buf[5] & 255) << 16) | ((uint64_t) (buf[6] & 255) << 8) 
| ((uint64_t) (buf[7] & 255) << 0);
-  } else {
-    value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) | 
((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) | 
((uint64_t) (buf[4] & 255) << 32)
-        | ((uint64_t) (buf[5] & 255) << 40) | ((uint64_t) (buf[6] & 255) << 
48) | ((uint64_t) (buf[7] & 255) << 56);
-  }
-  return sizeof(value);
-}
-
-int Socket::read(uint32_t &value, bool is_little_endian) {
-  auto buf = readBuffer(value);
-
-  if (is_little_endian) {
-    value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
-  } else {
-    value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
-  }
-  return sizeof(value);
-}
-
-int Socket::read(uint16_t &value, bool is_little_endian) {
-  auto buf = readBuffer(value);
-
-  if (is_little_endian) {
-    value = (buf[0] << 8) | buf[1];
-  } else {
-    value = buf[0] | buf[1] << 8;
-  }
-  return sizeof(value);
-}
-
-int Socket::readData(std::vector<uint8_t> &buf, int buflen, bool 
retrieve_all_bytes) {
-  if (static_cast<int>(buf.capacity()) < buflen) {
-    buf.resize(buflen);
-  }
-  return readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen, 
retrieve_all_bytes);
-}
-
-int Socket::readData(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
-  int32_t total_read = 0;
-  while (buflen) {
-    int16_t fd = select_descriptor(1000);
-    if (fd < 0) {
-      if (listeners_ <= 0) {
-        logger_->log_debug("fd %d close %i", fd, buflen);
-        close(socket_file_descriptor_);
-      }
-      return -1;
-    }
-    int bytes_read = recv(fd, buf, buflen, 0);
-    logger_->log_trace("Recv call %d", bytes_read);
-    if (bytes_read <= 0) {
-      if (bytes_read == 0) {
-        logger_->log_debug("Other side hung up on %d", fd);
-      } else {
-        if (errno == EAGAIN || errno == EWOULDBLOCK) {
-          // continue
-          return -2;
-        }
-        logger_->log_error("Could not recv on %d ( port %d), error: %s", fd, 
port_, strerror(errno));
-      }
-      return -1;
-    }
-    buflen -= bytes_read;
-    buf += bytes_read;
-    total_read += bytes_read;
-    if (!retrieve_all_bytes) {
-      break;
-    }
-  }
-  total_read_ += total_read;
-  return total_read;
-}
-
-} /* namespace io */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/DataStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/DataStream.cpp b/libminifi/src/io/DataStream.cpp
index 656bcdb..871e21a 100644
--- a/libminifi/src/io/DataStream.cpp
+++ b/libminifi/src/io/DataStream.cpp
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 #include "io/DataStream.h"
-#include <arpa/inet.h>
 #include <vector>
 #include <iostream>
 #include <cstdint>
@@ -24,6 +23,7 @@
 #include <cstring>
 #include <string>
 #include <algorithm>
+#include <iterator>
 
 namespace org {
 namespace apache {
@@ -95,9 +95,14 @@ int DataStream::readData(std::vector<uint8_t> &buf, int 
buflen) {
   }
 
   if (static_cast<int>(buf.capacity()) < buflen)
-    buf.resize(buflen);
+    buf.resize(buflen+1);
 
-  buf.insert(buf.begin(), &buffer[readBuffer], &buffer[readBuffer + buflen]);
+#ifdef WIN32
+  // back inserter works differently on win32 versions
+  buf.insert(buf.begin(), &buffer[readBuffer], &buffer[(readBuffer + 
buflen-1)]);
+#else
+  buf.insert(buf.begin(), &buffer[readBuffer], &buffer[(readBuffer + buflen)]);
+#endif
 
   readBuffer += buflen;
   return buflen;
@@ -108,9 +113,12 @@ int DataStream::readData(uint8_t *buf, int buflen) {
     // if read exceed
     return -1;
   }
-
-  std::copy(&buffer[readBuffer], &buffer[readBuffer + buflen], buf);
-
+#ifdef WIN32
+  // back inserter works differently on win32 versions
+  std::copy(&buffer[readBuffer], &buffer[(readBuffer + buflen-1)], buf);
+#else
+  std::copy(&buffer[readBuffer], &buffer[(readBuffer + buflen)], buf);
+#endif
   readBuffer += buflen;
   return buflen;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/DescriptorStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/DescriptorStream.cpp 
b/libminifi/src/io/DescriptorStream.cpp
index d50a39f..38e1c85 100644
--- a/libminifi/src/io/DescriptorStream.cpp
+++ b/libminifi/src/io/DescriptorStream.cpp
@@ -18,7 +18,6 @@
 
 #include "io/DescriptorStream.h"
 #include <fstream>
-#include <unistd.h>
 #include <vector>
 #include <memory>
 #include <string>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/Serializable.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/Serializable.cpp 
b/libminifi/src/io/Serializable.cpp
index 7d3c79a..081f3db 100644
--- a/libminifi/src/io/Serializable.cpp
+++ b/libminifi/src/io/Serializable.cpp
@@ -16,13 +16,17 @@
  * limitations under the License.
  */
 #include "io/Serializable.h"
-#include <arpa/inet.h>
 #include <cstdio>
 #include <iostream>
 #include <vector>
 #include <string>
 #include <algorithm>
 #include "io/DataStream.h"
+#ifdef WIN32
+#include "Winsock2.h"
+#else
+#include <arpa/inet.h>
+#endif
 namespace org {
 namespace apache {
 namespace nifi {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/ServerSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/ServerSocket.cpp 
b/libminifi/src/io/ServerSocket.cpp
index bfb098f..cf79e0c 100644
--- a/libminifi/src/io/ServerSocket.cpp
+++ b/libminifi/src/io/ServerSocket.cpp
@@ -17,13 +17,16 @@
  */
 #include "io/ServerSocket.h"
 #include "io/DescriptorStream.h"
-#include <netinet/tcp.h>
+
 #include <sys/types.h>
+#ifndef WIN32
+#include <netinet/tcp.h>
 #include <netdb.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <unistd.h>
+#endif
 #include <cstdio>
 #include <memory>
 #include <utility>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/StreamFactory.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/StreamFactory.cpp 
b/libminifi/src/io/StreamFactory.cpp
index a66e4c1..bb021ae 100644
--- a/libminifi/src/io/StreamFactory.cpp
+++ b/libminifi/src/io/StreamFactory.cpp
@@ -98,7 +98,11 @@ StreamFactory::StreamFactory(const 
std::shared_ptr<Configure> &configure) {
   std::string secureStr;
   bool is_secure = false;
   if (configure->get(Configure::nifi_remote_input_secure, secureStr) && 
org::apache::nifi::minifi::utils::StringUtils::StringToBool(secureStr, 
is_secure)) {
+#ifdef OPENSSL_SUPPORT
     delegate_ = std::make_shared<SocketCreator<TLSSocket, 
TLSContext>>(configure);
+#else
+    delegate_ = std::make_shared<SocketCreator<Socket, 
SocketContext>>(configure);
+#endif
   } else {
     delegate_ = std::make_shared<SocketCreator<Socket, 
SocketContext>>(configure);
   }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/posix/ClientSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/posix/ClientSocket.cpp 
b/libminifi/src/io/posix/ClientSocket.cpp
new file mode 100644
index 0000000..113f914
--- /dev/null
+++ b/libminifi/src/io/posix/ClientSocket.cpp
@@ -0,0 +1,484 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "io/ClientSocket.h"
+#include <netinet/tcp.h>
+#include <sys/types.h>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/ioctl.h>
+#include <net/if.h>
+#include <ifaddrs.h>
+#include <unistd.h>
+#include <cstdio>
+#include <memory>
+#include <utility>
+#include <vector>
+#include <cerrno>
+#include <iostream>
+#include <string>
+#include "io/validation.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+Socket::Socket(const std::shared_ptr<SocketContext> &context, const 
std::string &hostname, const uint16_t port, const uint16_t listeners = -1)
+    : requested_hostname_(hostname),
+      port_(port),
+      addr_info_(0),
+      socket_file_descriptor_(-1),
+      socket_max_(0),
+      total_written_(0),
+      total_read_(0),
+      is_loopback_only_(false),
+      listeners_(listeners),
+      canonical_hostname_(""),
+      nonBlocking_(false),
+      logger_(logging::LoggerFactory<Socket>::getLogger()) {
+  FD_ZERO(&total_list_);
+  FD_ZERO(&read_fds_);
+}
+
+Socket::Socket(const std::shared_ptr<SocketContext> &context, const 
std::string &hostname, const uint16_t port)
+    : Socket(context, hostname, port, 0) {
+}
+
+Socket::Socket(const Socket &&other)
+    : requested_hostname_(std::move(other.requested_hostname_)),
+      port_(std::move(other.port_)),
+      is_loopback_only_(false),
+      addr_info_(std::move(other.addr_info_)),
+      socket_file_descriptor_(other.socket_file_descriptor_),
+      socket_max_(other.socket_max_.load()),
+      listeners_(other.listeners_),
+      total_list_(other.total_list_),
+      read_fds_(other.read_fds_),
+      canonical_hostname_(std::move(other.canonical_hostname_)),
+      nonBlocking_(false),
+      logger_(std::move(other.logger_)) {
+  total_written_ = other.total_written_.load();
+  total_read_ = other.total_read_.load();
+}
+
+Socket::~Socket() {
+  closeStream();
+}
+
+void Socket::closeStream() {
+  if (0 != addr_info_) {
+    freeaddrinfo(addr_info_);
+    addr_info_ = 0;
+  }
+  if (socket_file_descriptor_ >= 0) {
+    logging::LOG_DEBUG(logger_) << "Closing " << socket_file_descriptor_;
+    close(socket_file_descriptor_);
+    socket_file_descriptor_ = -1;
+  }
+  if (total_written_ > 0) {
+    local_network_interface_.log_write(total_written_);
+    total_written_ = 0;
+  }
+  if (total_read_ > 0) {
+    local_network_interface_.log_read(total_read_);
+    total_read_ = 0;
+  }
+}
+
+void Socket::setNonBlocking() {
+  if (listeners_ <= 0) {
+    nonBlocking_ = true;
+  }
+}
+
+int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
+  if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype, 
p->ai_protocol)) == -1) {
+    logger_->log_error("error while connecting to server socket");
+    return -1;
+  }
+
+  setSocketOptions(socket_file_descriptor_);
+
+  if (listeners_ <= 0 && !local_network_interface_.getInterface().empty()) {
+    // bind to local network interface
+    ifaddrs* list = NULL;
+    ifaddrs* item = NULL;
+    ifaddrs* itemFound = NULL;
+    int result = getifaddrs(&list);
+    if (result == 0) {
+      item = list;
+      while (item) {
+        if ((item->ifa_addr != NULL) && (item->ifa_name != NULL) && (AF_INET 
== item->ifa_addr->sa_family)) {
+          if (strcmp(item->ifa_name, 
local_network_interface_.getInterface().c_str()) == 0) {
+            itemFound = item;
+            break;
+          }
+        }
+        item = item->ifa_next;
+      }
+
+      if (itemFound != NULL) {
+        result = bind(socket_file_descriptor_, itemFound->ifa_addr, 
sizeof(struct sockaddr_in));
+        if (result < 0)
+          logger_->log_info("Bind to interface %s failed %s", 
local_network_interface_.getInterface(), strerror(errno));
+        else
+          logger_->log_info("Bind to interface %s", 
local_network_interface_.getInterface());
+      }
+      freeifaddrs(list);
+    }
+  }
+
+  if (listeners_ > 0) {
+    struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
+    sa_loc->sin_family = AF_INET;
+    sa_loc->sin_port = htons(port_);
+    if (is_loopback_only_) {
+      sa_loc->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+    } else {
+      sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
+    }
+    if (bind(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) {
+      logger_->log_error("Could not bind to socket, reason %s", 
strerror(errno));
+      return -1;
+    }
+  }
+  {
+    if (listeners_ <= 0) {
+      struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
+      sa_loc->sin_family = AF_INET;
+      sa_loc->sin_port = htons(port_);
+      // use any address if you are connecting to the local machine for testing
+      // otherwise we must use the requested hostname
+      if (IsNullOrEmpty(requested_hostname_) || requested_hostname_ == 
"localhost") {
+        if (is_loopback_only_) {
+          sa_loc->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+        } else {
+          sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
+        }
+      } else {
+        sa_loc->sin_addr.s_addr = addr;
+      }
+      if (connect(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) {
+        close(socket_file_descriptor_);
+        socket_file_descriptor_ = -1;
+        return -1;
+      }
+    }
+  }
+
+  // listen
+  if (listeners_ > 0) {
+    if (listen(socket_file_descriptor_, listeners_) == -1) {
+      return -1;
+    } else {
+      logger_->log_debug("Created connection with %d listeners", listeners_);
+    }
+  }
+  // add the listener to the total set
+  FD_SET(socket_file_descriptor_, &total_list_);
+  socket_max_ = socket_file_descriptor_;
+  logger_->log_debug("Created connection with file descriptor %d", 
socket_file_descriptor_);
+  return 0;
+}
+
+int16_t Socket::initialize() {
+  addrinfo hints = { sizeof(addrinfo) };
+  memset(&hints, 0, sizeof hints);  // make sure the struct is empty
+  hints.ai_family = AF_UNSPEC;
+  hints.ai_socktype = SOCK_STREAM;
+  hints.ai_flags = AI_CANONNAME;
+  if (listeners_ > 0)
+    hints.ai_flags |= AI_PASSIVE;
+  hints.ai_protocol = 0; /* any protocol */
+
+  int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints, 
&addr_info_);
+
+  if (errcode != 0) {
+    logger_->log_error("Saw error during getaddrinfo, error: %s", 
strerror(errno));
+    return -1;
+  }
+
+  socket_file_descriptor_ = -1;
+
+  in_addr_t addr;
+  struct hostent *h;
+#ifdef __MACH__
+  h = gethostbyname(requested_hostname_.c_str());
+#else
+  const char *host;
+
+  host = requested_hostname_.c_str();
+  char buf[1024];
+  struct hostent he;
+  int hh_errno;
+  gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
+#endif
+  if (h == nullptr) {
+    return -1;
+  }
+  memcpy(reinterpret_cast<char*>(&addr), h->h_addr_list[0], h->h_length);
+
+  auto p = addr_info_;
+  for (; p != NULL; p = p->ai_next) {
+    if (IsNullOrEmpty(canonical_hostname_)) {
+      if (!IsNullOrEmpty(p) && !IsNullOrEmpty(p->ai_canonname))
+        canonical_hostname_ = p->ai_canonname;
+    }
+    // we've successfully connected
+    if (port_ > 0 && createConnection(p, addr) >= 0) {
+      // Put the socket in non-blocking mode:
+      if (nonBlocking_) {
+        if (fcntl(socket_file_descriptor_, F_SETFL, O_NONBLOCK) < 0) {
+          // handle error
+          logger_->log_error("Could not create non blocking to socket", 
strerror(errno));
+        } else {
+          logger_->log_debug("Successfully applied O_NONBLOCK to fd");
+        }
+      }
+      logger_->log_debug("Successfully created connection");
+      return 0;
+      break;
+    }
+  }
+
+  logger_->log_debug("Could not find device for our connection");
+  return -1;
+}
+
+int16_t Socket::select_descriptor(const uint16_t msec) {
+  if (listeners_ == 0) {
+    return socket_file_descriptor_;
+  }
+
+  struct timeval tv;
+
+  read_fds_ = total_list_;
+
+  tv.tv_sec = msec / 1000;
+  tv.tv_usec = (msec % 1000) * 1000;
+
+  std::lock_guard<std::recursive_mutex> guard(selection_mutex_);
+
+  if (msec > 0)
+    select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv);
+  else
+    select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL);
+
+  for (int i = 0; i <= socket_max_; i++) {
+    if (FD_ISSET(i, &read_fds_)) {
+      if (i == socket_file_descriptor_) {
+        if (listeners_ > 0) {
+          struct sockaddr_storage remoteaddr;  // client address
+          socklen_t addrlen = sizeof remoteaddr;
+          int newfd = accept(socket_file_descriptor_, (struct sockaddr *) 
&remoteaddr, &addrlen);
+          FD_SET(newfd, &total_list_);  // add to master set
+          if (newfd > socket_max_) {    // keep track of the max
+            socket_max_ = newfd;
+          }
+          return newfd;
+
+        } else {
+          return socket_file_descriptor_;
+        }
+        // we have a new connection
+      } else {
+        // data to be received on i
+        return i;
+      }
+    }
+  }
+
+  logger_->log_debug("Could not find a suitable file descriptor or select 
timed out");
+
+  return -1;
+}
+
+int16_t Socket::setSocketOptions(const int sock) {
+  int opt = 1;
+#ifndef __MACH__
+  if (setsockopt(sock, SOL_TCP, TCP_NODELAY, static_cast<void*>(&opt), 
sizeof(opt)) < 0) {
+    logger_->log_error("setsockopt() TCP_NODELAY failed");
+    close(sock);
+    return -1;
+  }
+  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 
reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) {
+    logger_->log_error("setsockopt() SO_REUSEADDR failed");
+    close(sock);
+    return -1;
+  }
+
+  int sndsize = 256 * 1024;
+  if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char 
*>(&sndsize), sizeof(sndsize)) < 0) {
+    logger_->log_error("setsockopt() SO_SNDBUF failed");
+    close(sock);
+    return -1;
+  }
+
+#else
+  if (listeners_ > 0) {
+    // lose the pesky "address already in use" error message
+    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char 
*>(&opt), sizeof(opt)) < 0) {
+      logger_->log_error("setsockopt() SO_REUSEADDR failed");
+      close(sock);
+      return -1;
+    }
+  }
+#endif
+  return 0;
+}
+
+std::string Socket::getHostname() const {
+  return canonical_hostname_;
+}
+
+int Socket::writeData(std::vector<uint8_t> &buf, int buflen) {
+  if (static_cast<int>(buf.capacity()) < buflen)
+    return -1;
+  return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen);
+}
+
+// data stream overrides
+
+int Socket::writeData(uint8_t *value, int size) {
+  int ret = 0, bytes = 0;
+
+  int fd = select_descriptor(1000);
+  while (bytes < size) {
+    ret = send(fd, value + bytes, size - bytes, 0);
+    // check for errors
+    if (ret <= 0) {
+      close(fd);
+      logger_->log_error("Could not send to %d, error: %s", fd, 
strerror(errno));
+      return ret;
+    }
+    bytes += ret;
+  }
+
+  if (ret)
+    logger_->log_trace("Send data size %d over socket %d", size, fd);
+  total_written_ += bytes;
+  return bytes;
+}
+
+template<typename T>
+inline std::vector<uint8_t> Socket::readBuffer(const T& t) {
+  std::vector<uint8_t> buf;
+  buf.resize(sizeof t);
+  readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+  return buf;
+}
+
+int Socket::write(uint64_t base_value, bool is_little_endian) {
+  return Serializable::write(base_value, this, is_little_endian);
+}
+
+int Socket::write(uint32_t base_value, bool is_little_endian) {
+  return Serializable::write(base_value, this, is_little_endian);
+}
+
+int Socket::write(uint16_t base_value, bool is_little_endian) {
+  return Serializable::write(base_value, this, is_little_endian);
+}
+
+int Socket::read(uint64_t &value, bool is_little_endian) {
+  auto buf = readBuffer(value);
+
+  if (is_little_endian) {
+    value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | 
((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | 
((uint64_t) (buf[4] & 255) << 24)
+        | ((uint64_t) (buf[5] & 255) << 16) | ((uint64_t) (buf[6] & 255) << 8) 
| ((uint64_t) (buf[7] & 255) << 0);
+  } else {
+    value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) | 
((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) | 
((uint64_t) (buf[4] & 255) << 32)
+        | ((uint64_t) (buf[5] & 255) << 40) | ((uint64_t) (buf[6] & 255) << 
48) | ((uint64_t) (buf[7] & 255) << 56);
+  }
+  return sizeof(value);
+}
+
+int Socket::read(uint32_t &value, bool is_little_endian) {
+  auto buf = readBuffer(value);
+
+  if (is_little_endian) {
+    value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
+  } else {
+    value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
+  }
+  return sizeof(value);
+}
+
+int Socket::read(uint16_t &value, bool is_little_endian) {
+  auto buf = readBuffer(value);
+
+  if (is_little_endian) {
+    value = (buf[0] << 8) | buf[1];
+  } else {
+    value = buf[0] | buf[1] << 8;
+  }
+  return sizeof(value);
+}
+
+int Socket::readData(std::vector<uint8_t> &buf, int buflen, bool 
retrieve_all_bytes) {
+  if (static_cast<int>(buf.capacity()) < buflen) {
+    buf.resize(buflen);
+  }
+  return readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen, 
retrieve_all_bytes);
+}
+
+int Socket::readData(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
+  int32_t total_read = 0;
+  while (buflen) {
+    int16_t fd = select_descriptor(1000);
+    if (fd < 0) {
+      if (listeners_ <= 0) {
+        logger_->log_debug("fd %d close %i", fd, buflen);
+        close(socket_file_descriptor_);
+      }
+      return -1;
+    }
+    int bytes_read = recv(fd, buf, buflen, 0);
+    logger_->log_trace("Recv call %d", bytes_read);
+    if (bytes_read <= 0) {
+      if (bytes_read == 0) {
+        logger_->log_debug("Other side hung up on %d", fd);
+      } else {
+        if (errno == EAGAIN || errno == EWOULDBLOCK) {
+          // continue
+          return -2;
+        }
+        logger_->log_error("Could not recv on %d ( port %d), error: %s", fd, 
port_, strerror(errno));
+      }
+      return -1;
+    }
+    buflen -= bytes_read;
+    buf += bytes_read;
+    total_read += bytes_read;
+    if (!retrieve_all_bytes) {
+      break;
+    }
+  }
+  total_read_ += total_read;
+  return total_read;
+}
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/tls/SecureDescriptorStream.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/tls/SecureDescriptorStream.cpp 
b/libminifi/src/io/tls/SecureDescriptorStream.cpp
index c1ddd8a..9e1334f 100644
--- a/libminifi/src/io/tls/SecureDescriptorStream.cpp
+++ b/libminifi/src/io/tls/SecureDescriptorStream.cpp
@@ -18,7 +18,6 @@
 
 #include "io/tls/SecureDescriptorStream.h"
 #include <fstream>
-#include <unistd.h>
 #include <vector>
 #include <memory>
 #include <string>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/tls/TLSServerSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/tls/TLSServerSocket.cpp 
b/libminifi/src/io/tls/TLSServerSocket.cpp
index f7901d3..22b01a3 100644
--- a/libminifi/src/io/tls/TLSServerSocket.cpp
+++ b/libminifi/src/io/tls/TLSServerSocket.cpp
@@ -17,13 +17,16 @@
  */
 #include "io/tls/SecureDescriptorStream.h"
 #include "io/tls/TLSServerSocket.h"
-#include <netinet/tcp.h>
+
 #include <sys/types.h>
+#ifndef WIN32
+#include <netinet/tcp.h>
 #include <netdb.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include <unistd.h>
+#endif
 #include <cstdio>
 #include <memory>
 #include <chrono>

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/io/win/ClientSocket.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/io/win/ClientSocket.cpp 
b/libminifi/src/io/win/ClientSocket.cpp
new file mode 100644
index 0000000..261c2af
--- /dev/null
+++ b/libminifi/src/io/win/ClientSocket.cpp
@@ -0,0 +1,538 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "io/ClientSocket.h"
+#ifndef WIN32
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <sys/ioctl.h>
+#include <net/if.h>
+#include <ifaddrs.h>
+#include <unistd.h>
+#endif
+#include <sys/types.h>
+
+#include <cstdio>
+#include <memory>
+#include <utility>
+#include <vector>
+#include <cerrno>
+#include <iostream>
+#include <string>
+#include "io/validation.h"
+#include "core/logging/LoggerConfiguration.h"
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace io {
+
+Socket::Socket(const std::shared_ptr<SocketContext> &context, const 
std::string &hostname, const uint16_t port, const uint16_t listeners = -1)
+    : requested_hostname_(hostname),
+      port_(port),
+      addr_info_(0),
+      socket_file_descriptor_(-1),
+      socket_max_(0),
+      total_written_(0),
+      total_read_(0),
+      is_loopback_only_(false),
+      listeners_(listeners),
+      canonical_hostname_(""),
+      nonBlocking_(false),
+      logger_(logging::LoggerFactory<Socket>::getLogger()) {
+  FD_ZERO(&total_list_);
+  FD_ZERO(&read_fds_);
+  initialize_socket();
+}
+
+Socket::Socket(const std::shared_ptr<SocketContext> &context, const 
std::string &hostname, const uint16_t port)
+    : Socket(context, hostname, port, 0) {
+  initialize_socket();
+}
+
+Socket::Socket(const Socket &&other)
+    : requested_hostname_(std::move(other.requested_hostname_)),
+      port_(std::move(other.port_)),
+      is_loopback_only_(false),
+      addr_info_(std::move(other.addr_info_)),
+      socket_file_descriptor_(other.socket_file_descriptor_),
+      socket_max_(other.socket_max_.load()),
+      listeners_(other.listeners_),
+      total_list_(other.total_list_),
+      read_fds_(other.read_fds_),
+      canonical_hostname_(std::move(other.canonical_hostname_)),
+      nonBlocking_(false),
+      logger_(std::move(other.logger_)) {
+  total_written_ = other.total_written_.load();
+  total_read_ = other.total_read_.load();
+}
+
+Socket::~Socket() {
+  closeStream();
+}
+
+void Socket::closeStream() {
+  if (0 != addr_info_) {
+    freeaddrinfo(addr_info_);
+    addr_info_ = 0;
+  }
+  if (socket_file_descriptor_ >= 0 && socket_file_descriptor_ != 
INVALID_SOCKET) {
+    logging::LOG_DEBUG(logger_) << "Closing " << socket_file_descriptor_;
+#ifdef WIN32
+    closesocket(socket_file_descriptor_);
+#else
+    close(socket_file_descriptor_);
+#endif
+    socket_file_descriptor_ = -1;
+  }
+  if (total_written_ > 0) {
+    local_network_interface_.log_write(total_written_);
+    total_written_ = 0;
+  }
+  if (total_read_ > 0) {
+    local_network_interface_.log_read(total_read_);
+    total_read_ = 0;
+  }
+}
+
+void Socket::setNonBlocking() {
+  if (listeners_ <= 0) {
+    nonBlocking_ = true;
+  }
+}
+#ifdef WIN32
+int8_t Socket::createConnection(const addrinfo *p, struct in_addr &addr) {
+#else
+int8_t Socket::createConnection(const addrinfo *p, in_addr_t &addr) {
+#endif
+  if ((socket_file_descriptor_ = socket(p->ai_family, p->ai_socktype, 
p->ai_protocol)) == INVALID_SOCKET) {
+    logger_->log_error("error while connecting to server socket");
+    return -1;
+  }
+
+  setSocketOptions(socket_file_descriptor_);
+
+  if (listeners_ <= 0 && !local_network_interface_.getInterface().empty()) {
+    // bind to local network interface
+#ifndef WIN32
+    ifaddrs* list = NULL;
+    ifaddrs* item = NULL;
+    ifaddrs* itemFound = NULL;
+    int result = getifaddrs(&list);
+    if (result == 0) {
+      item = list;
+      while (item) {
+        if ((item->ifa_addr != NULL) && (item->ifa_name != NULL) && (AF_INET 
== item->ifa_addr->sa_family)) {
+          if (strcmp(item->ifa_name, 
local_network_interface_.getInterface().c_str()) == 0) {
+            itemFound = item;
+            break;
+          }
+        }
+        item = item->ifa_next;
+      }
+
+      if (itemFound != NULL) {
+        result = bind(socket_file_descriptor_, itemFound->ifa_addr, 
sizeof(struct sockaddr_in));
+        if (result < 0)
+          logger_->log_info("Bind to interface %s failed %s", 
local_network_interface_.getInterface(), strerror(errno));
+        else
+          logger_->log_info("Bind to interface %s", 
local_network_interface_.getInterface());
+      }
+      freeifaddrs(list);
+    }
+#endif
+  }
+
+  if (listeners_ > 0) {
+    struct sockaddr_in *sa_loc = (struct sockaddr_in*) p->ai_addr;
+    sa_loc->sin_family = AF_INET;
+    sa_loc->sin_port = htons(port_);
+    if (is_loopback_only_) {
+      sa_loc->sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+    } else {
+      sa_loc->sin_addr.s_addr = htonl(INADDR_ANY);
+    }
+    if (bind(socket_file_descriptor_, p->ai_addr, p->ai_addrlen) == -1) {
+      logger_->log_error("Could not bind to socket, reason %s", 
strerror(errno));
+      return -1;
+    }
+  }
+  {
+    if (listeners_ <= 0) {
+      struct sockaddr_in sa_loc;
+      memset(&sa_loc, 0x00, sizeof(sa_loc));
+      sa_loc.sin_family = AF_INET;
+      sa_loc.sin_port = htons(port_);
+
+      // use any address if you are connecting to the local machine for testing
+      // otherwise we must use the requested hostname
+      if (IsNullOrEmpty(requested_hostname_) || requested_hostname_ == 
"localhost") {
+        if (is_loopback_only_) {
+          sa_loc.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+        } else {
+          sa_loc.sin_addr.s_addr = htonl(INADDR_ANY);
+        }
+      } else {
+#ifdef WIN32
+        sa_loc.sin_addr.s_addr = addr.s_addr;
+#else
+        sa_loc.sin_addr.s_addr = addr;
+#endif
+      }
+      if (connect(socket_file_descriptor_, (struct sockaddr*) &sa_loc, 
sizeof(sa_loc)) == -1) {
+#ifdef WIN32
+        int err = WSAGetLastError();
+        if (err == WSAEADDRNOTAVAIL) {
+          logger_->log_error("invalid or unknown IP");
+        } else if (err == WSAECONNREFUSED) {
+          logger_->log_error("Connection refused");
+        } else {
+          logger_->log_error("Unknown error");
+        }
+
+#endif
+        closeStream();
+        socket_file_descriptor_ = -1;
+        return -1;
+      }
+    }
+  }
+
+  // listen
+  if (listeners_ > 0) {
+    if (listen(socket_file_descriptor_, listeners_) == -1) {
+      return -1;
+    } else {
+      logger_->log_debug("Created connection with %d listeners", listeners_);
+    }
+  }
+  // add the listener to the total set
+  FD_SET(socket_file_descriptor_, &total_list_);
+  socket_max_ = socket_file_descriptor_;
+  logger_->log_debug("Created connection with file descriptor %d", 
socket_file_descriptor_);
+  return 0;
+}
+
+int16_t Socket::initialize() {
+  addrinfo hints = { sizeof(addrinfo) };
+  memset(&hints, 0, sizeof hints);  // make sure the struct is empty
+  hints.ai_family = AF_UNSPEC;
+  hints.ai_socktype = SOCK_STREAM;
+  hints.ai_flags = AI_CANONNAME;
+  if (listeners_ > 0)
+    hints.ai_flags |= AI_PASSIVE;
+  hints.ai_protocol = 0; /* any protocol */
+
+  int errcode = getaddrinfo(requested_hostname_.c_str(), 0, &hints, 
&addr_info_);
+
+  if (errcode != 0) {
+    logger_->log_error("Saw error during getaddrinfo, error: %s", 
strerror(errno));
+    return -1;
+  }
+
+  socket_file_descriptor_ = -1;
+
+#ifndef WIN32
+  in_addr_t addr;
+#else
+  in_addr addr;
+#endif
+  struct hostent *h;
+#if defined( __MACH__ ) || defined(WIN32)
+  h = gethostbyname(requested_hostname_.c_str());
+#else
+  const char *host;
+
+  host = requested_hostname_.c_str();
+  char buf[1024];
+  struct hostent he;
+  int hh_errno;
+  gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
+#endif
+  if (h == nullptr) {
+    return -1;
+  }
+  memcpy(reinterpret_cast<char*>(&addr), h->h_addr_list[0], h->h_length);
+
+  auto p = addr_info_;
+  for (; p != NULL; p = p->ai_next) {
+    if (IsNullOrEmpty(canonical_hostname_)) {
+      if (!IsNullOrEmpty(p) && !IsNullOrEmpty(p->ai_canonname))
+        canonical_hostname_ = p->ai_canonname;
+    }
+    // we've successfully connected
+    if (port_ > 0 && createConnection(p, addr) >= 0) {
+      // Put the socket in non-blocking mode:
+      if (nonBlocking_) {
+#ifndef WIN32
+        if (fcntl(socket_file_descriptor_, F_SETFL, O_NONBLOCK) < 0) {
+          // handle error
+          logger_->log_error("Could not create non blocking to socket", 
strerror(errno));
+        } else {
+          logger_->log_debug("Successfully applied O_NONBLOCK to fd");
+        }
+#else
+        u_long iMode = 1;
+        if (ioctlsocket(socket_file_descriptor_, FIONBIO, &iMode) == NO_ERROR) 
{
+          logger_->log_debug("Successfully applied O_NONBLOCK to fd");
+        }
+#endif
+      }
+      logger_->log_debug("Successfully created connection");
+      return 0;
+      break;
+    }
+  }
+
+  logger_->log_debug("Could not find device for our connection");
+  return -1;
+}
+
+int16_t Socket::select_descriptor(const uint16_t msec) {
+  if (listeners_ == 0) {
+    return socket_file_descriptor_;
+  }
+
+  struct timeval tv;
+
+  read_fds_ = total_list_;
+
+  tv.tv_sec = msec / 1000;
+  tv.tv_usec = (msec % 1000) * 1000;
+
+  std::lock_guard<std::recursive_mutex> guard(selection_mutex_);
+
+  if (msec > 0)
+    select(socket_max_ + 1, &read_fds_, NULL, NULL, &tv);
+  else
+    select(socket_max_ + 1, &read_fds_, NULL, NULL, NULL);
+
+  for (int i = 0; i <= socket_max_; i++) {
+    if (FD_ISSET(i, &read_fds_)) {
+      if (i == socket_file_descriptor_) {
+        if (listeners_ > 0) {
+          struct sockaddr_storage remoteaddr;  // client address
+          socklen_t addrlen = sizeof remoteaddr;
+          int newfd = accept(socket_file_descriptor_, (struct sockaddr *) 
&remoteaddr, &addrlen);
+          FD_SET(newfd, &total_list_);  // add to master set
+          if (newfd > socket_max_) {    // keep track of the max
+            socket_max_ = newfd;
+          }
+          return newfd;
+
+        } else {
+          return socket_file_descriptor_;
+        }
+        // we have a new connection
+      } else {
+        // data to be received on i
+        return i;
+      }
+    }
+  }
+
+  logger_->log_debug("Could not find a suitable file descriptor or select 
timed out");
+
+  return -1;
+}
+
+int16_t Socket::setSocketOptions(const SocketDescriptor sock) {
+  int opt = 1;
+#ifndef WIN32
+#ifndef __MACH__
+  if (setsockopt(sock, SOL_TCP, TCP_NODELAY, static_cast<void*>(&opt), 
sizeof(opt)) < 0) {
+    logger_->log_error("setsockopt() TCP_NODELAY failed");
+    close(sock);
+    return -1;
+  }
+  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 
reinterpret_cast<char*>(&opt), sizeof(opt)) < 0) {
+    logger_->log_error("setsockopt() SO_REUSEADDR failed");
+    close(sock);
+    return -1;
+  }
+
+  int sndsize = 256 * 1024;
+  if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, reinterpret_cast<char 
*>(&sndsize), sizeof(sndsize)) < 0) {
+    logger_->log_error("setsockopt() SO_SNDBUF failed");
+    close(sock);
+    return -1;
+  }
+
+#else
+  if (listeners_ > 0) {
+    // lose the pesky "address already in use" error message
+    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char 
*>(&opt), sizeof(opt)) < 0) {
+      logger_->log_error("setsockopt() SO_REUSEADDR failed");
+      close(sock);
+      return -1;
+    }
+  }
+#endif
+#endif
+  return 0;
+}
+
+std::string Socket::getHostname() const {
+  return canonical_hostname_;
+}
+
+int Socket::writeData(std::vector<uint8_t> &buf, int buflen) {
+  if (static_cast<int>(buf.capacity()) < buflen)
+    return -1;
+  return writeData(reinterpret_cast<uint8_t *>(&buf[0]), buflen);
+}
+
+// data stream overrides
+
+int Socket::writeData(uint8_t *value, int size) {
+  int ret = 0, bytes = 0;
+
+  int fd = select_descriptor(1000);
+  while (bytes < size) {
+#ifdef WIN32
+    const char *vts = (const char*)value;
+    ret = send(fd, vts + bytes, size - bytes, 0);
+#else
+    ret = send(fd, value + bytes, size - bytes, 0);
+#endif
+    // check for errors
+    if (ret <= 0) {
+      close(fd);
+      logger_->log_error("Could not send to %d, error: %s", fd, 
strerror(errno));
+      return ret;
+    }
+    bytes += ret;
+  }
+
+  if (ret)
+    logger_->log_trace("Send data size %d over socket %d", size, fd);
+  total_written_ += bytes;
+  return bytes;
+}
+
+template<typename T>
+inline std::vector<uint8_t> Socket::readBuffer(const T& t) {
+  std::vector<uint8_t> buf;
+  buf.resize(sizeof t);
+  readData(reinterpret_cast<uint8_t *>(&buf[0]), sizeof(t));
+  return buf;
+}
+
+int Socket::write(uint64_t base_value, bool is_little_endian) {
+  return Serializable::write(base_value, this, is_little_endian);
+}
+
+int Socket::write(uint32_t base_value, bool is_little_endian) {
+  return Serializable::write(base_value, this, is_little_endian);
+}
+
+int Socket::write(uint16_t base_value, bool is_little_endian) {
+  return Serializable::write(base_value, this, is_little_endian);
+}
+
+int Socket::read(uint64_t &value, bool is_little_endian) {
+  auto buf = readBuffer(value);
+
+  if (is_little_endian) {
+    value = ((uint64_t) buf[0] << 56) | ((uint64_t) (buf[1] & 255) << 48) | 
((uint64_t) (buf[2] & 255) << 40) | ((uint64_t) (buf[3] & 255) << 32) | 
((uint64_t) (buf[4] & 255) << 24)
+        | ((uint64_t) (buf[5] & 255) << 16) | ((uint64_t) (buf[6] & 255) << 8) 
| ((uint64_t) (buf[7] & 255) << 0);
+  } else {
+    value = ((uint64_t) buf[0] << 0) | ((uint64_t) (buf[1] & 255) << 8) | 
((uint64_t) (buf[2] & 255) << 16) | ((uint64_t) (buf[3] & 255) << 24) | 
((uint64_t) (buf[4] & 255) << 32)
+        | ((uint64_t) (buf[5] & 255) << 40) | ((uint64_t) (buf[6] & 255) << 
48) | ((uint64_t) (buf[7] & 255) << 56);
+  }
+  return sizeof(value);
+}
+
+int Socket::read(uint32_t &value, bool is_little_endian) {
+  auto buf = readBuffer(value);
+
+  if (is_little_endian) {
+    value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3];
+  } else {
+    value = buf[0] | buf[1] << 8 | buf[2] << 16 | buf[3] << 24;
+  }
+  return sizeof(value);
+}
+
+int Socket::read(uint16_t &value, bool is_little_endian) {
+  auto buf = readBuffer(value);
+
+  if (is_little_endian) {
+    value = (buf[0] << 8) | buf[1];
+  } else {
+    value = buf[0] | buf[1] << 8;
+  }
+  return sizeof(value);
+}
+
+int Socket::readData(std::vector<uint8_t> &buf, int buflen, bool 
retrieve_all_bytes) {
+  if (static_cast<int>(buf.capacity()) < buflen) {
+    buf.resize(buflen);
+  }
+  return readData(reinterpret_cast<uint8_t*>(&buf[0]), buflen, 
retrieve_all_bytes);
+}
+
+int Socket::readData(uint8_t *buf, int buflen, bool retrieve_all_bytes) {
+  int32_t total_read = 0;
+  while (buflen) {
+    int16_t fd = select_descriptor(1000);
+    if (fd < 0) {
+      if (listeners_ <= 0) {
+        logger_->log_debug("fd %d close %i", fd, buflen);
+        close(socket_file_descriptor_);
+      }
+      return -1;
+    }
+#ifdef WIN32
+    char *bfs = reinterpret_cast<char*>(buf);
+    int bytes_read = recv(fd, bfs, buflen, 0);
+#else
+    int bytes_read = recv(fd, buf, buflen, 0);
+#endif
+    logger_->log_trace("Recv call %d", bytes_read);
+    if (bytes_read <= 0) {
+      if (bytes_read == 0) {
+        logger_->log_debug("Other side hung up on %d", fd);
+      } else {
+        if (errno == EAGAIN || errno == EWOULDBLOCK) {
+          // continue
+          return -2;
+        }
+        logger_->log_error("Could not recv on %d, error: %s", fd, 
strerror(errno));
+      }
+      return -1;
+    }
+    buflen -= bytes_read;
+    buf += bytes_read;
+    total_read += bytes_read;
+    if (!retrieve_all_bytes) {
+      break;
+    }
+  }
+  total_read_ += total_read;
+  return total_read;
+}
+
+} /* namespace io */
+} /* namespace minifi */
+} /* namespace nifi */
+} /* namespace apache */
+} /* namespace org */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/processors/AppendHostInfo.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/AppendHostInfo.cpp 
b/libminifi/src/processors/AppendHostInfo.cpp
index bbc86a5..8769de2 100644
--- a/libminifi/src/processors/AppendHostInfo.cpp
+++ b/libminifi/src/processors/AppendHostInfo.cpp
@@ -20,14 +20,7 @@
 #include "processors/AppendHostInfo.h"
 #define __USE_POSIX
 #include <limits.h>
-#include <sys/time.h>
 #include <string.h>
-#include <netdb.h>
-#include <netinet/in.h>
-#include <sys/socket.h>
-#include <sys/ioctl.h>
-#include <net/if.h>
-#include <arpa/inet.h>
 #include <memory>
 #include <string>
 #include <set>
@@ -42,6 +35,15 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
+#ifndef WIN32
+#include <netdb.h>
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <sys/ioctl.h>
+#include <net/if.h>
+#include <arpa/inet.h>
+#endif
+
 #ifndef HOST_NAME_MAX
 #define HOST_NAME_MAX 255
 #endif
@@ -80,6 +82,7 @@ void AppendHostInfo::onTrigger(core::ProcessContext *context, 
core::ProcessSessi
   std::string iface;
   context->getProperty(InterfaceName.getName(), iface);
   // Confirm the specified interface name exists on this device
+#ifndef WIN32
   if (if_nametoindex(iface.c_str()) != 0) {
     struct ifreq ifr;
     int fd = socket(AF_INET, SOCK_DGRAM, 0);
@@ -94,6 +97,7 @@ void AppendHostInfo::onTrigger(core::ProcessContext *context, 
core::ProcessSessi
     context->getProperty(IPAttribute.getName(), ipAttribute);
     flow->addAttribute(ipAttribute.c_str(), inet_ntoa(((struct sockaddr_in *) 
&ifr.ifr_addr)->sin_addr));
   }
+#endif
 
   // Transfer to the relationship
   session->transfer(flow, Success);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/processors/ExecuteProcess.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ExecuteProcess.cpp 
b/libminifi/src/processors/ExecuteProcess.cpp
index db4399e..6f43e8a 100644
--- a/libminifi/src/processors/ExecuteProcess.cpp
+++ b/libminifi/src/processors/ExecuteProcess.cpp
@@ -42,14 +42,17 @@ namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
-
-core::Property ExecuteProcess::Command("Command", "Specifies the command to be 
executed; if just the name of an executable"
-                                       " is provided, it must be in the user's 
environment PATH.",
-                                       "");
-core::Property ExecuteProcess::CommandArguments("Command Arguments", "The 
arguments to supply to the executable delimited by white space. White "
-                                                "space can be escaped by 
enclosing it in double-quotes.",
-                                                "");
-core::Property ExecuteProcess::WorkingDir("Working Directory", "The directory 
to use as the current working directory when executing the command", "");
+#ifndef WIN32
+core::Property ExecuteProcess::Command(
+    
core::PropertyBuilder::createProperty("Command")->withDescription("Specifies 
the command to be executed; if just the name of an executable"
+                                                                      " is 
provided, it must be in the user's environment 
PATH.")->supportsExpressionLanguage(true)->withDefaultValue("")->build());
+core::Property ExecuteProcess::CommandArguments(
+    core::PropertyBuilder::createProperty("Command 
Arguments")->withDescription("The arguments to supply to the executable 
delimited by white space. White "
+                                                                               
 "space can be escaped by enclosing it in "
+                                                                               
 
"double-quotes.")->supportsExpressionLanguage(true)->withDefaultValue("")->build());
+core::Property ExecuteProcess::WorkingDir(
+    core::PropertyBuilder::createProperty("Working 
Directory")->withDescription("The directory to use as the current working 
directory when executing the command")->supportsExpressionLanguage(true)
+        ->withDefaultValue("")->build());
 core::Property ExecuteProcess::BatchDuration("Batch Duration", "If the process 
is expected to be long-running and produce textual output, a "
                                              "batch duration can be 
specified.",
                                              "0");
@@ -74,13 +77,13 @@ void ExecuteProcess::initialize() {
 void ExecuteProcess::onTrigger(core::ProcessContext *context, 
core::ProcessSession *session) {
   std::string value;
   std::shared_ptr<core::FlowFile> flow_file;
-  if (context->getProperty(Command.getName(), value, flow_file)) {
+  if (context->getProperty(Command, value, flow_file)) {
     this->_command = value;
   }
-  if (context->getProperty(CommandArguments.getName(), value, flow_file)) {
+  if (context->getProperty(CommandArguments, value, flow_file)) {
     this->_commandArgument = value;
   }
-  if (context->getProperty(WorkingDir.getName(), value, flow_file)) {
+  if (context->getProperty(WorkingDir, value, flow_file)) {
     this->_workingDir = value;
   }
   if (context->getProperty(BatchDuration.getName(), value)) {
@@ -229,7 +232,7 @@ void ExecuteProcess::onTrigger(core::ProcessContext 
*context, core::ProcessSessi
     }
   }
 }
-
+#endif
 } /* namespace processors */
 } /* namespace minifi */
 } /* namespace nifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/processors/ExtractText.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/ExtractText.cpp 
b/libminifi/src/processors/ExtractText.cpp
index 2175415..328a476 100644
--- a/libminifi/src/processors/ExtractText.cpp
+++ b/libminifi/src/processors/ExtractText.cpp
@@ -41,78 +41,78 @@ core::Property ExtractText::SizeLimit("Size Limit", 
"Maximum number of bytes to
 core::Relationship ExtractText::Success("success", "success operational on the 
flow record");
 
 void ExtractText::initialize() {
-    //! Set the supported properties
-    std::set<core::Property> properties;
-    properties.insert(Attribute);
-    setSupportedProperties(properties);
-    //! Set the supported relationships
-    std::set<core::Relationship> relationships;
-    relationships.insert(Success);
-    setSupportedRelationships(relationships);
+  //! Set the supported properties
+  std::set<core::Property> properties;
+  properties.insert(Attribute);
+  setSupportedProperties(properties);
+  //! Set the supported relationships
+  std::set<core::Relationship> relationships;
+  relationships.insert(Success);
+  setSupportedRelationships(relationships);
 }
 
 void ExtractText::onTrigger(core::ProcessContext *context, 
core::ProcessSession *session) {
-    std::shared_ptr<core::FlowFile> flowFile = session->get();
+  std::shared_ptr<core::FlowFile> flowFile = session->get();
 
-    if (!flowFile) {
-        return;
-    }
+  if (!flowFile) {
+    return;
+  }
 
-    ReadCallback cb(flowFile, context);
-    session->read(flowFile, &cb);
-    session->transfer(flowFile, Success);
+  ReadCallback cb(flowFile, context);
+  session->read(flowFile, &cb);
+  session->transfer(flowFile, Success);
 }
 
 int64_t ExtractText::ReadCallback::process(std::shared_ptr<io::BaseStream> 
stream) {
-    int64_t ret = 0;
-    uint64_t size_limit = flowFile_->getSize();
-    uint64_t read_size = 0;
-    uint64_t loop_read = max_read_;
-
-    std::string attrKey, sizeLimitStr;
-    ctx_->getProperty(Attribute.getName(), attrKey);
-    ctx_->getProperty(SizeLimit.getName(), sizeLimitStr);
-
-    if (sizeLimitStr == "")
-        size_limit = DEFAULT_SIZE_LIMIT;
-    else if (sizeLimitStr != "0")
-        size_limit = std::stoi(sizeLimitStr);
-
-    std::ostringstream contentStream;
-    std::string contentStr;
-
-    while (read_size < size_limit) {
-        if (size_limit - read_size < (uint64_t)max_read_)
-            loop_read = size_limit - read_size;
-
-        ret = stream->readData(buffer_, loop_read);
-        buffer_.resize(ret);
-
-        if (ret < 0) {
-            return -1;
-        }
-
-        if (ret > 0) {
-            contentStream.write(reinterpret_cast<const char*>(&buffer_[0]), 
ret);
-            if (contentStream.fail()) {
-                return -1;
-            }
-        } else {
-            break;
-        }
+  int64_t ret = 0;
+  uint64_t size_limit = flowFile_->getSize();
+  uint64_t read_size = 0;
+  uint64_t loop_read = max_read_;
+
+  std::string attrKey, sizeLimitStr;
+  ctx_->getProperty(Attribute.getName(), attrKey);
+  ctx_->getProperty(SizeLimit.getName(), sizeLimitStr);
+
+  if (sizeLimitStr == "")
+    size_limit = DEFAULT_SIZE_LIMIT;
+  else if (sizeLimitStr != "0")
+    size_limit = std::stoi(sizeLimitStr);
+
+  std::ostringstream contentStream;
+  std::string contentStr;
+
+  while (read_size < size_limit) {
+    if (size_limit - read_size < (uint64_t) max_read_)
+      loop_read = size_limit - read_size;
+
+    ret = stream->readData(buffer_, loop_read);
+    buffer_.resize(ret);
+
+    if (ret < 0) {
+      return -1;
+    }
+
+    if (ret > 0) {
+      contentStream.write(reinterpret_cast<const char*>(&buffer_[0]), ret);
+      if (contentStream.fail()) {
+        return -1;
+      }
+    } else {
+      break;
     }
+  }
 
-    contentStr = contentStream.str();
-    flowFile_->setAttribute(attrKey, contentStr);
-    return read_size;
+  contentStr = contentStream.str();
+  flowFile_->setAttribute(attrKey, contentStr);
+  return read_size;
 }
 
 ExtractText::ReadCallback::ReadCallback(std::shared_ptr<core::FlowFile> 
flowFile, core::ProcessContext *ctx)
-    : max_read_(getpagesize()),
+    : max_read_(4096),
       flowFile_(flowFile),
       ctx_(ctx) {
-          buffer_.resize(max_read_);
-      }
+  buffer_.resize(max_read_);
+}
 
 } /* namespace processors */
 } /* namespace minifi */

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/processors/GenerateFlowFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GenerateFlowFile.cpp 
b/libminifi/src/processors/GenerateFlowFile.cpp
index 82abe82..223346f 100644
--- a/libminifi/src/processors/GenerateFlowFile.cpp
+++ b/libminifi/src/processors/GenerateFlowFile.cpp
@@ -18,7 +18,6 @@
  * limitations under the License.
  */
 #include "processors/GenerateFlowFile.h"
-#include <sys/time.h>
 #include <time.h>
 #include <vector>
 #include <queue>
@@ -29,6 +28,10 @@
 #include <chrono>
 #include <thread>
 #include <random>
+#ifdef WIN32
+#define srandom srand
+#define random rand
+#endif
 #include "utils/StringUtils.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
@@ -46,7 +49,7 @@ core::Property GenerateFlowFile::DataFormat("Data Format", 
"Specifies whether th
 core::Property GenerateFlowFile::UniqueFlowFiles("Unique FlowFiles", "If true, 
each FlowFile that is generated will be unique. If false, a random value will 
be generated and all FlowFiles", "true");
 core::Relationship GenerateFlowFile::Success("success", "success operational 
on the flow record");
 const unsigned int TEXT_LEN = 90;
-static const char TEXT_CHARS[TEXT_LEN+1] = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()-_=+/?.,';:\"?<>\n\t
 ";
+static const char TEXT_CHARS[TEXT_LEN + 1] = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()-_=+/?.,';:\"?<>\n\t
 ";
 
 void GenerateFlowFile::initialize() {
   // Set the supported properties

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/2b0a55e4/libminifi/src/processors/GetFile.cpp
----------------------------------------------------------------------
diff --git a/libminifi/src/processors/GetFile.cpp 
b/libminifi/src/processors/GetFile.cpp
index 4261d1a..89e50fc 100644
--- a/libminifi/src/processors/GetFile.cpp
+++ b/libminifi/src/processors/GetFile.cpp
@@ -16,15 +16,16 @@
  * limitations under the License.
  */
 #include "processors/GetFile.h"
-#include <sys/time.h>
 #include <sys/types.h>
 #include <sys/stat.h>
 #include <time.h>
 #include <stdio.h>
-#include <dirent.h>
 #include <limits.h>
-#include <unistd.h>
+#ifndef WIN32
 #include <regex.h>
+#else
+#include <regex>
+#endif
 #include <vector>
 #include <queue>
 #include <map>
@@ -34,10 +35,18 @@
 #include <string>
 #include <iostream>
 #include "utils/StringUtils.h"
+#include "utils/file/FileUtils.h"
 #include "utils/TimeUtil.h"
 #include "core/ProcessContext.h"
 #include "core/ProcessSession.h"
 
+#ifndef S_ISDIR
+#define S_ISDIR(mode)  (((mode) & S_IFMT) == S_IFDIR)
+#endif
+#define R_OK    4       /* Test for read permission.  */
+#define W_OK    2       /* Test for write permission.  */
+#define F_OK    0       /* Test for existence.  */
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -45,7 +54,9 @@ namespace minifi {
 namespace processors {
 
 core::Property GetFile::BatchSize("Batch Size", "The maximum number of files 
to pull in each iteration", "10");
-core::Property GetFile::Directory("Input Directory", "The input directory from 
which to pull files", ".", true, "", {}, {});
+core::Property GetFile::Directory(
+    core::PropertyBuilder::createProperty("Input 
Directory")->withDescription("The input directory from which to pull 
files")->isRequired(true)->supportsExpressionLanguage(true)->withDefaultValue(".")
+        ->build());
 core::Property GetFile::IgnoreHiddenFile("Ignore Hidden Files", "Indicates 
whether or not hidden files should be ignored", "true");
 core::Property GetFile::KeepSourceFile("Keep Source File", "If true, the file 
is not deleted after it has been copied to the Content Repository", "false");
 core::Property GetFile::MaxAge("Maximum File Age", "The minimum age that a 
file must be in order to be pulled;"
@@ -137,7 +148,7 @@ void GetFile::onTrigger(core::ProcessContext *context, 
core::ProcessSession *ses
     if (request_.pollInterval == 0 || (getTimeMillis() - last_listing_time_) > 
request_.pollInterval) {
       std::string directory;
       const std::shared_ptr<core::FlowFile> flow_file;
-      if (!context->getProperty(Directory.getName(), directory, flow_file)) {
+      if (!context->getProperty(Directory, directory, flow_file)) {
         logger_->log_warn("Resolved missing Input Directory property value");
       }
       performListing(directory, request_);
@@ -203,10 +214,10 @@ bool GetFile::acceptFile(std::string fullName, 
std::string name, const GetFileRe
   struct stat statbuf;
 
   if (stat(fullName.c_str(), &statbuf) == 0) {
-    if (request.minSize > 0 && statbuf.st_size < (int32_t)request.minSize)
+    if (request.minSize > 0 && statbuf.st_size < (int32_t) request.minSize)
       return false;
 
-    if (request.maxSize > 0 && statbuf.st_size > (int32_t)request.maxSize)
+    if (request.maxSize > 0 && statbuf.st_size > (int32_t) request.maxSize)
       return false;
 
     uint64_t modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000);
@@ -224,7 +235,7 @@ bool GetFile::acceptFile(std::string fullName, std::string 
name, const GetFileRe
 
     if (request.keepSourceFile == false && access(fullName.c_str(), W_OK) != 0)
       return false;
-
+#ifndef WIN32
     regex_t regex;
     int ret = regcomp(&regex, request.fileFilter.c_str(), 0);
     if (ret)
@@ -233,6 +244,12 @@ bool GetFile::acceptFile(std::string fullName, std::string 
name, const GetFileRe
     regfree(&regex);
     if (ret)
       return false;
+#else
+    std::regex regex(request.fileFilter);
+    if (!std::regex_match(name, regex)) {
+      return false;
+    }
+#endif
     metrics_->input_bytes_ += statbuf.st_size;
     metrics_->accepted_files_++;
     return true;
@@ -242,6 +259,7 @@ bool GetFile::acceptFile(std::string fullName, std::string 
name, const GetFileRe
 }
 
 void GetFile::performListing(std::string dir, const GetFileRequest &request) {
+#ifndef WIN32
   DIR *d;
   d = opendir(dir.c_str());
   if (!d)
@@ -255,7 +273,7 @@ void GetFile::performListing(std::string dir, const 
GetFileRequest &request) {
       break;
     std::string d_name = entry->d_name;
     std::string path = dir + "/" + d_name;
-    struct stat statbuf{};
+    struct stat statbuf { };
     if (stat(path.c_str(), &statbuf) != 0) {
       logger_->log_warn("Failed to stat %s", path);
       break;
@@ -273,6 +291,33 @@ void GetFile::performListing(std::string dir, const 
GetFileRequest &request) {
     }
   }
   closedir(d);
+#else
+  HANDLE hFind;
+  WIN32_FIND_DATA FindFileData;
+
+  if ((hFind = FindFirstFile(dir.c_str(), &FindFileData)) != 
INVALID_HANDLE_VALUE) {
+    do {
+      struct stat statbuf {};
+      if (stat(FindFileData.cFileName, &statbuf) != 0) {
+        logger_->log_warn("Failed to stat %s", FindFileData.cFileName);
+        break;
+      }
+
+      std::string path = dir + "/" + FindFileData.cFileName;
+      if (S_ISDIR(statbuf.st_mode)) {
+        if (request.recursive && strcmp(FindFileData.cFileName, "..") != 0 && 
strcmp(FindFileData.cFileName, ".") != 0) {
+          performListing(path, request);
+        }
+      } else {
+        if (acceptFile(path, FindFileData.cFileName, request)) {
+          // check whether we can take this file
+          putListing(path);
+        }
+      }
+    }while (FindNextFile(hFind, &FindFileData));
+    FindClose(hFind);
+  }
+#endif
 }
 
 int16_t 
GetFile::getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>
 &metric_vector) {

Reply via email to