http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/FlowControlProtocol.cpp
----------------------------------------------------------------------
diff --git a/src/FlowControlProtocol.cpp b/src/FlowControlProtocol.cpp
new file mode 100644
index 0000000..6aaa969
--- /dev/null
+++ b/src/FlowControlProtocol.cpp
@@ -0,0 +1,540 @@
+/**
+ * @file FlowControlProtocol.cpp
+ * FlowControlProtocol class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <sys/time.h>
+#include <stdio.h>
+#include <time.h>
+#include <chrono>
+#include <thread>
+#include <random>
+#include <netinet/tcp.h>
+#include <iostream>
+#include "FlowController.h"
+#include "FlowControlProtocol.h"
+
+int FlowControlProtocol::connectServer(const char *host, uint16_t port)
+{
+       in_addr_t addr;
+       int sock = 0;
+       struct hostent *h;
+#ifdef __MACH__
+       h = gethostbyname(host);
+#else
+       char buf[1024];
+       struct hostent he;
+       int hh_errno;
+       gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno);
+#endif
+       memcpy((char *) &addr, h->h_addr_list[0], h->h_length);
+       sock = socket(AF_INET, SOCK_STREAM, 0);
+       if (sock < 0)
+       {
+               _logger->log_error("Could not create socket to hostName %s", 
host);
+               return 0;
+       }
+
+#ifndef __MACH__
+       int opt = 1;
+       bool nagle_off = true;
+
+       if (nagle_off)
+       {
+               if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, 
sizeof(opt)) < 0)
+               {
+                       _logger->log_error("setsockopt() TCP_NODELAY failed");
+                       close(sock);
+                       return 0;
+               }
+               if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
+                               (char *)&opt, sizeof(opt)) < 0)
+               {
+                       _logger->log_error("setsockopt() SO_REUSEADDR failed");
+                       close(sock);
+                       return 0;
+               }
+       }
+
+       int sndsize = 256*1024;
+       if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, 
(int)sizeof(sndsize)) < 0)
+       {
+               _logger->log_error("setsockopt() SO_SNDBUF failed");
+               close(sock);
+               return 0;
+       }
+#endif
+
+       struct sockaddr_in sa;
+       socklen_t socklen;
+       int status;
+
+       memset(&sa, 0, sizeof(sa));
+       sa.sin_family = AF_INET;
+       sa.sin_addr.s_addr = htonl(INADDR_ANY);
+       sa.sin_port = htons(0);
+       socklen = sizeof(sa);
+       if (bind(sock, (struct sockaddr *)&sa, socklen) < 0)
+       {
+               _logger->log_error("socket bind failed");
+               close(sock);
+               return 0;
+       }
+
+       memset(&sa, 0, sizeof(sa));
+       sa.sin_family = AF_INET;
+       sa.sin_addr.s_addr = addr;
+       sa.sin_port = htons(port);
+       socklen = sizeof(sa);
+
+       status = connect(sock, (struct sockaddr *)&sa, socklen);
+
+       if (status < 0)
+       {
+               _logger->log_error("socket connect failed to %s %d", host, 
port);
+               close(sock);
+               return 0;
+       }
+
+       _logger->log_info("Flow Control Protocol socket %d connect to server %s 
port %d success", sock, host, port);
+
+       return sock;
+}
+
+int FlowControlProtocol::sendData(uint8_t *buf, int buflen)
+{
+       int ret = 0, bytes = 0;
+
+       while (bytes < buflen)
+       {
+               ret = send(_socket, buf+bytes, buflen-bytes, 0);
+               //check for errors
+               if (ret == -1)
+               {
+                       return ret;
+               }
+               bytes+=ret;
+       }
+
+       return bytes;
+}
+
+int FlowControlProtocol::selectClient(int msec)
+{
+       fd_set fds;
+       struct timeval tv;
+    int retval;
+    int fd = _socket;
+
+    FD_ZERO(&fds);
+    FD_SET(fd, &fds);
+
+    tv.tv_sec = msec/1000;
+    tv.tv_usec = (msec % 1000) * 1000;
+
+    if (msec > 0)
+       retval = select(fd+1, &fds, NULL, NULL, &tv);
+    else
+       retval = select(fd+1, &fds, NULL, NULL, NULL);
+
+    if (retval <= 0)
+      return retval;
+    if (FD_ISSET(fd, &fds))
+      return retval;
+    else
+      return 0;
+}
+
+int FlowControlProtocol::readData(uint8_t *buf, int buflen)
+{
+       int sendSize = buflen;
+
+       while (buflen)
+       {
+               int status;
+               status = selectClient(MAX_READ_TIMEOUT);
+               if (status <= 0)
+               {
+                       return status;
+               }
+#ifndef __MACH__
+               status = read(_socket, buf, buflen);
+#else
+               status = recv(_socket, buf, buflen, 0);
+#endif
+               if (status <= 0)
+               {
+                       return status;
+               }
+               buflen -= status;
+               buf += status;
+       }
+
+       return sendSize;
+}
+
+int FlowControlProtocol::readHdr(FlowControlProtocolHeader *hdr)
+{
+       uint8_t buffer[sizeof(FlowControlProtocolHeader)];
+
+       uint8_t *data = buffer;
+
+       int status = readData(buffer, sizeof(FlowControlProtocolHeader));
+       if (status <= 0)
+               return status;
+
+       uint32_t value;
+       data = this->decode(data, value);
+       hdr->msgType = value;
+
+       data = this->decode(data, value);
+       hdr->seqNumber = value;
+
+       data = this->decode(data, value);
+       hdr->status = value;
+
+       data = this->decode(data, value);
+       hdr->payloadLen = value;
+
+       return sizeof(FlowControlProtocolHeader);
+}
+
+void FlowControlProtocol::start()
+{
+       if (_running)
+               return;
+       _running = true;
+       _logger->log_info("FlowControl Protocol Start");
+       _thread = new std::thread(run, this);
+       _thread->detach();
+}
+
+void FlowControlProtocol::stop()
+{
+       if (!_running)
+               return;
+       _running = false;
+       _logger->log_info("FlowControl Protocol Stop");
+       delete _thread;
+}
+
+void FlowControlProtocol::run(FlowControlProtocol *protocol)
+{
+       while (protocol->_running)
+       {
+               
std::this_thread::sleep_for(std::chrono::milliseconds(protocol->_reportInterval));
+               if (!protocol->_registered)
+               {
+                       // if it is not register yet
+                       protocol->sendRegisterReq();
+                       // protocol->_controller->reload("flow.xml");
+               }
+               else
+                       protocol->sendReportReq();
+       }
+       return;
+}
+
+int FlowControlProtocol::sendRegisterReq()
+{
+       if (_registered)
+       {
+               _logger->log_info("Already registered");
+               return -1;
+       }
+
+       uint16_t port = this->_serverPort;
+
+       if (this->_socket <= 0)
+               this->_socket = connectServer(_serverName.c_str(), port);
+
+       if (this->_socket <= 0)
+               return -1;
+
+       // Calculate the total payload msg size
+       uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 
0) +
+                       FlowControlMsgIDEncodingLen(FLOW_XML_NAME, 
this->_controller->getName().size()+1);
+       uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
+
+       uint8_t *data = new uint8_t[size];
+       uint8_t *start = data;
+
+       // encode the HDR
+       FlowControlProtocolHeader hdr;
+       hdr.msgType = REGISTER_REQ;
+       hdr.payloadLen = payloadSize;
+       hdr.seqNumber  = this->_seqNumber;
+       hdr.status = RESP_SUCCESS;
+       data = this->encode(data, hdr.msgType);
+       data = this->encode(data, hdr.seqNumber);
+       data = this->encode(data, hdr.status);
+       data = this->encode(data, hdr.payloadLen);
+
+       // encode the serial number
+       data = this->encode(data, FLOW_SERIAL_NUMBER);
+       data = this->encode(data, this->_serialNumber, 8);
+
+       // encode the XML name
+       data = this->encode(data, FLOW_XML_NAME);
+       data = this->encode(data, this->_controller->getName());
+
+       // send it
+       int status = sendData(start, size);
+       delete[] start;
+       if (status <= 0)
+       {
+               close(_socket);
+               _socket = 0;
+               _logger->log_error("Flow Control Protocol Send Register Req 
failed");
+               return -1;
+       }
+
+       // Looking for register respond
+       status = readHdr(&hdr);
+
+       if (status <= 0)
+       {
+               close(_socket);
+               _socket = 0;
+               _logger->log_error("Flow Control Protocol Read Register Resp 
header failed");
+               return -1;
+       }
+       _logger->log_info("Flow Control Protocol receive MsgType %s", 
FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+       _logger->log_info("Flow Control Protocol receive Seq Num %d", 
hdr.seqNumber);
+       _logger->log_info("Flow Control Protocol receive Resp Code %s", 
FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+       _logger->log_info("Flow Control Protocol receive Payload len %d", 
hdr.payloadLen);
+
+       if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber)
+       {
+               this->_registered = true;
+               this->_seqNumber++;
+               _logger->log_info("Flow Control Protocol Register success");
+               uint8_t *payload = new uint8_t[hdr.payloadLen];
+               uint8_t *payloadPtr = payload;
+               status = readData(payload, hdr.payloadLen);
+               if (status <= 0)
+               {
+                       delete[] payload;
+                       _logger->log_info("Flow Control Protocol Register Read 
Payload fail");
+                       close(_socket);
+                       _socket = 0;
+                       return -1;
+               }
+               while (payloadPtr < (payload + hdr.payloadLen))
+               {
+                       uint32_t msgID;
+                       payloadPtr = this->decode(payloadPtr, msgID);
+                       if (((FlowControlMsgID) msgID) == REPORT_INTERVAL)
+                       {
+                               // Fixed 4 bytes
+                               uint32_t reportInterval;
+                               payloadPtr = this->decode(payloadPtr, 
reportInterval);
+                               _logger->log_info("Flow Control Protocol 
receive report interval %d ms", reportInterval);
+                               this->_reportInterval = reportInterval;
+                       }
+                       else if (((FlowControlMsgID) msgID) == FLOW_XML_CONTENT)
+                       {
+                               uint32_t xmlLen;
+                               payloadPtr = this->decode(payloadPtr, xmlLen);
+                               _logger->log_info("Flow Control Protocol 
receive XML content length %d", xmlLen);
+                               time_t rawtime;
+                               struct tm *timeinfo;
+                               time(&rawtime);
+                               timeinfo = localtime(&rawtime);
+                               std::string xmlFileName = "flow.";
+                               xmlFileName += asctime(timeinfo);
+                               xmlFileName += ".xml";
+                               std::ofstream fs;
+                               fs.open(xmlFileName.c_str(), std::fstream::out 
| std::fstream::binary | std::fstream::trunc);
+                               if (fs.is_open())
+                               {
+                                       fs.write((const char *)payloadPtr, 
xmlLen);
+                                       fs.close();
+                                       
this->_controller->reload(xmlFileName.c_str());
+                               }
+                       }
+                       else
+                       {
+                               break;
+                       }
+               }
+               delete[] payload;
+               close(_socket);
+               _socket = 0;
+               return 0;
+       }
+       else
+       {
+               _logger->log_info("Flow Control Protocol Register fail");
+               close(_socket);
+               _socket = 0;
+               return -1;
+       }
+}
+
+
+int FlowControlProtocol::sendReportReq()
+{
+       uint16_t port = this->_serverPort;
+
+       if (this->_socket <= 0)
+               this->_socket = connectServer(_serverName.c_str(), port);
+
+       if (this->_socket <= 0)
+               return -1;
+
+       // Calculate the total payload msg size
+       uint32_t payloadSize =
+                       FlowControlMsgIDEncodingLen(FLOW_XML_NAME, 
this->_controller->getName().size()+1);
+       uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize;
+
+       uint8_t *data = new uint8_t[size];
+       uint8_t *start = data;
+
+       // encode the HDR
+       FlowControlProtocolHeader hdr;
+       hdr.msgType = REPORT_REQ;
+       hdr.payloadLen = payloadSize;
+       hdr.seqNumber  = this->_seqNumber;
+       hdr.status = RESP_SUCCESS;
+       data = this->encode(data, hdr.msgType);
+       data = this->encode(data, hdr.seqNumber);
+       data = this->encode(data, hdr.status);
+       data = this->encode(data, hdr.payloadLen);
+
+       // encode the XML name
+       data = this->encode(data, FLOW_XML_NAME);
+       data = this->encode(data, this->_controller->getName());
+
+       // send it
+       int status = sendData(start, size);
+       delete[] start;
+       if (status <= 0)
+       {
+               close(_socket);
+               _socket = 0;
+               _logger->log_error("Flow Control Protocol Send Report Req 
failed");
+               return -1;
+       }
+
+       // Looking for report respond
+       status = readHdr(&hdr);
+
+       if (status <= 0)
+       {
+               close(_socket);
+               _socket = 0;
+               _logger->log_error("Flow Control Protocol Read Report Resp 
header failed");
+               return -1;
+       }
+       _logger->log_info("Flow Control Protocol receive MsgType %s", 
FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+       _logger->log_info("Flow Control Protocol receive Seq Num %d", 
hdr.seqNumber);
+       _logger->log_info("Flow Control Protocol receive Resp Code %s", 
FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+       _logger->log_info("Flow Control Protocol receive Payload len %d", 
hdr.payloadLen);
+
+       if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber)
+       {
+               this->_seqNumber++;
+               uint8_t *payload = new uint8_t[hdr.payloadLen];
+               uint8_t *payloadPtr = payload;
+               status = readData(payload, hdr.payloadLen);
+               if (status <= 0)
+               {
+                       delete[] payload;
+                       _logger->log_info("Flow Control Protocol Report Resp 
Read Payload fail");
+                       close(_socket);
+                       _socket = 0;
+                       return -1;
+               }
+               std::string processor;
+               std::string propertyName;
+               std::string propertyValue;
+               while (payloadPtr < (payload + hdr.payloadLen))
+               {
+                       uint32_t msgID;
+                       payloadPtr = this->decode(payloadPtr, msgID);
+                       if (((FlowControlMsgID) msgID) == PROCESSOR_NAME)
+                       {
+                               uint32_t len;
+                               payloadPtr = this->decode(payloadPtr, len);
+                               processor = (const char *) payloadPtr;
+                               payloadPtr += len;
+                               _logger->log_info("Flow Control Protocol 
receive report resp processor %s", processor.c_str());
+                       }
+                       else if (((FlowControlMsgID) msgID) == PROPERTY_NAME)
+                       {
+                               uint32_t len;
+                               payloadPtr = this->decode(payloadPtr, len);
+                               propertyName = (const char *) payloadPtr;
+                               payloadPtr += len;
+                               _logger->log_info("Flow Control Protocol 
receive report resp property name %s", propertyName.c_str());
+                       }
+                       else if (((FlowControlMsgID) msgID) == PROPERTY_VALUE)
+                       {
+                               uint32_t len;
+                               payloadPtr = this->decode(payloadPtr, len);
+                               propertyValue = (const char *) payloadPtr;
+                               payloadPtr += len;
+                               _logger->log_info("Flow Control Protocol 
receive report resp property value %s", propertyValue.c_str());
+                               
this->_controller->updatePropertyValue(processor, propertyName, propertyValue);
+                       }
+                       else
+                       {
+                               break;
+                       }
+               }
+               delete[] payload;
+               close(_socket);
+               _socket = 0;
+               return 0;
+       }
+       else if (hdr.status == RESP_TRIGGER_REGISTER && hdr.seqNumber == 
this->_seqNumber)
+       {
+               _logger->log_info("Flow Control Protocol trigger reregister");
+               this->_registered = false;
+               this->_seqNumber++;
+               close(_socket);
+               _socket = 0;
+               return 0;
+       }
+       else if (hdr.status == RESP_STOP_FLOW_CONTROLLER && hdr.seqNumber == 
this->_seqNumber)
+       {
+               _logger->log_info("Flow Control Protocol stop flow controller");
+               this->_controller->stop(true);
+               this->_seqNumber++;
+               close(_socket);
+               _socket = 0;
+               return 0;
+       }
+       else if (hdr.status == RESP_START_FLOW_CONTROLLER && hdr.seqNumber == 
this->_seqNumber)
+       {
+               _logger->log_info("Flow Control Protocol start flow 
controller");
+               this->_controller->start();
+               this->_seqNumber++;
+               close(_socket);
+               _socket = 0;
+               return 0;
+       }
+       else
+       {
+               _logger->log_info("Flow Control Protocol Report fail");
+               close(_socket);
+               _socket = 0;
+               return -1;
+       }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/FlowController.cpp
----------------------------------------------------------------------
diff --git a/src/FlowController.cpp b/src/FlowController.cpp
index 0525ccc..a2cafbc 100644
--- a/src/FlowController.cpp
+++ b/src/FlowController.cpp
@@ -44,6 +44,7 @@ FlowController::FlowController(std::string name)
        _initialized = false;
        _root = NULL;
        _logger = Logger::getLogger();
+       _protocol = new FlowControlProtocol(this);
 
        // NiFi config properties
        _configure = Configure::getConfigure();
@@ -58,6 +59,7 @@ FlowController::~FlowController()
 {
        stop(true);
        unload();
+       delete _protocol;
 }
 
 bool FlowController::isRunning()
@@ -75,6 +77,11 @@ void FlowController::stop(bool force)
        if (_running)
        {
                _logger->log_info("Stop Flow Controller");
+               this->_timerScheduler.stop();
+               // Wait for sometime for thread stop
+               std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+               if (this->_root)
+                       this->_root->stopProcessing(&this->_timerScheduler);
                _running = false;
        }
 }
@@ -88,12 +95,36 @@ void FlowController::unload()
        if (_initialized)
        {
                _logger->log_info("Unload Flow Controller");
+               if (_root)
+                       delete _root;
+               _root = NULL;
                _initialized = false;
+               _name = "";
        }
 
        return;
 }
 
+void FlowController::reload(std::string xmlFile)
+{
+       _logger->log_info("Starting to reload Flow Controller with xml %s", 
xmlFile.c_str());
+       stop(true);
+       unload();
+       std::string oldxmlFile = this->_xmlFileName;
+       this->_xmlFileName = xmlFile;
+       load();
+       start();
+       if (!this->_root)
+       {
+               this->_xmlFileName = oldxmlFile;
+               _logger->log_info("Rollback Flow Controller to xml %s", 
oldxmlFile.c_str());
+               stop(true);
+               unload();
+               load();
+               start();
+       }
+}
+
 Processor *FlowController::createProcessor(std::string name, uuid_t uuid)
 {
        Processor *processor = NULL;
@@ -291,6 +322,7 @@ void FlowController::parseRootProcessGroup(xmlDoc *doc, 
xmlNode *node)
                                        }
                                        // Set the root process group
                                        this->_root = group;
+                                       this->_name = name;
                                        xmlFree(name);
                                }
                        }
@@ -504,6 +536,20 @@ void FlowController::parseProcessorNode(xmlDoc *doc, 
xmlNode *processorNode, Pro
                                        xmlFree(temp);
                                }
                        }
+                       else if (xmlStrcmp(currentNode->name, BAD_CAST 
"maxConcurrentTasks") == 0)
+                       {
+                               char *temp = (char *) 
xmlNodeGetContent(currentNode);
+                               if (temp)
+                               {
+                                       int64_t maxConcurrentTasks;
+                                       if (Property::StringToInt(temp, 
maxConcurrentTasks))
+                                       {
+                                               
_logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", 
maxConcurrentTasks);
+                                               
processor->setMaxConcurrentTasks(maxConcurrentTasks);
+                                       }
+                                       xmlFree(temp);
+                               }
+                       }
                        else if (xmlStrcmp(currentNode->name, BAD_CAST 
"runDurationNanos") == 0)
                        {
                                char *temp = (char *) 
xmlNodeGetContent(currentNode);
@@ -548,12 +594,13 @@ void FlowController::load()
        }
        if (!_initialized)
        {
-               _logger->log_info("Load Flow Controller");
+               _logger->log_info("Load Flow Controller from file %s", 
_xmlFileName.c_str());
 
                xmlDoc *doc = xmlReadFile(_xmlFileName.c_str(), NULL, 
XML_PARSE_NONET);
                if (doc == NULL)
                {
                        _logger->log_error("xmlReadFile returned NULL when 
reading [%s]", _xmlFileName.c_str());
+                       _initialized = true;
                        return;
                }
 
@@ -637,6 +684,7 @@ bool FlowController::start()
                        if (this->_root)
                                
this->_root->startProcessing(&this->_timerScheduler);
                        _running = true;
+                       this->_protocol->start();
                }
                return true;
        }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/FlowFileRecord.cpp
----------------------------------------------------------------------
diff --git a/src/FlowFileRecord.cpp b/src/FlowFileRecord.cpp
index 601dcde..2dda47a 100644
--- a/src/FlowFileRecord.cpp
+++ b/src/FlowFileRecord.cpp
@@ -22,6 +22,9 @@
 #include <map>
 #include <sys/time.h>
 #include <time.h>
+#include <iostream>
+#include <fstream>
+#include <cstdio>
 
 #include "FlowFileRecord.h"
 #include "Relationship.h"
@@ -83,6 +86,7 @@ FlowFileRecord::~FlowFileRecord()
                if (_claim->getFlowFileRecordOwnedCount() == 0)
                {
                        _logger->log_debug("Delete Resource Claim %s", 
_claim->getContentFullPath().c_str());
+                       std::remove(_claim->getContentFullPath().c_str());
                        delete _claim;
                }
        }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/LogAttribute.cpp
----------------------------------------------------------------------
diff --git a/src/LogAttribute.cpp b/src/LogAttribute.cpp
index f4548af..67ed74e 100644
--- a/src/LogAttribute.cpp
+++ b/src/LogAttribute.cpp
@@ -25,6 +25,7 @@
 #include <time.h>
 #include <sstream>
 #include <string.h>
+#include <iostream>
 
 #include "TimeUtil.h"
 #include "LogAttribute.h"
@@ -103,6 +104,24 @@ void LogAttribute::onTrigger(ProcessContext *context, 
ProcessSession *session)
     {
        message << "\n" << "Content Claim:" << claim->getContentFullPath();
     }
+    if (logPayload && flow->getSize() <= 1024*1024)
+    {
+       message << "\n" << "Payload:" << "\n";
+       ReadCallback callback(flow->getSize());
+       session->read(flow, &callback);
+       for (int i = 0, j = 0; i < callback._readSize; i++)
+       {
+               char temp[8];
+               sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i]));
+               message << temp;
+               j++;
+               if (j == 16)
+               {
+                       message << '\n';
+                       j = 0;
+               }
+       }
+    }
     message << "\n" << dashLine << std::ends;
     std::string output = message.str();
 
@@ -127,6 +146,12 @@ void LogAttribute::onTrigger(ProcessContext *context, 
ProcessSession *session)
        break;
     }
 
+    // Test Import
+    FlowFileRecord *importRecord = session->create();
+    session->import(claim->getContentFullPath(), importRecord);
+    session->transfer(importRecord, Success);
+
+
     // Transfer to the relationship
     session->transfer(flow, Success);
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/ProcessGroup.cpp
----------------------------------------------------------------------
diff --git a/src/ProcessGroup.cpp b/src/ProcessGroup.cpp
index 78ae1a4..a5fd773 100644
--- a/src/ProcessGroup.cpp
+++ b/src/ProcessGroup.cpp
@@ -46,7 +46,24 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, 
std::string name, uuid_t uuid,
 
 ProcessGroup::~ProcessGroup()
 {
+       for (std::set<Connection *>::iterator it = _connections.begin(); it != 
_connections.end(); ++it)
+       {
+               Connection *connection = *it;
+               connection->drain();
+               delete connection;
+       }
+
+       for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+       {
+               ProcessGroup *processGroup(*it);
+               delete processGroup;
+       }
 
+       for (std::set<Processor *>::iterator it = _processors.begin(); it != 
_processors.end(); ++it)
+       {
+               Processor *processor(*it);
+               delete processor;
+       }
 }
 
 bool ProcessGroup::isRootProcessGroup()
@@ -198,6 +215,28 @@ Processor *ProcessGroup::findProcessor(uuid_t uuid)
        return ret;
 }
 
+void ProcessGroup::updatePropertyValue(std::string processorName, std::string 
propertyName, std::string propertyValue)
+{
+       std::lock_guard<std::mutex> lock(_mtx);
+
+       for (std::set<Processor *>::iterator it = _processors.begin(); it != 
_processors.end(); ++it)
+       {
+               Processor *processor(*it);
+               if (processor->getName() == processorName)
+               {
+                       processor->setProperty(propertyName, propertyValue);
+               }
+       }
+
+       for (std::set<ProcessGroup *>::iterator it = 
_childProcessGroups.begin(); it != _childProcessGroups.end(); ++it)
+       {
+               ProcessGroup *processGroup(*it);
+               processGroup->updatePropertyValue(processorName, propertyName, 
propertyValue);
+       }
+
+       return;
+}
+
 void ProcessGroup::addConnection(Connection *connection)
 {
        std::lock_guard<std::mutex> lock(_mtx);

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/ProcessSession.cpp
----------------------------------------------------------------------
diff --git a/src/ProcessSession.cpp b/src/ProcessSession.cpp
index cf8249d..32d8920 100644
--- a/src/ProcessSession.cpp
+++ b/src/ProcessSession.cpp
@@ -207,8 +207,9 @@ void ProcessSession::write(FlowFileRecord *flow, 
OutputStreamCallback *callback)
                                }
                                flow->_claim = claim;
                                claim->increaseFlowFileRecordOwnedCount();
+                               /*
                                _logger->log_debug("Write offset %d length %d 
into content %s for FlowFile UUID %s",
-                                               flow->_offset, flow->_size, 
flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str());
+                                               flow->_offset, flow->_size, 
flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
                                fs.close();
                        }
                        else
@@ -273,8 +274,9 @@ void ProcessSession::append(FlowFileRecord *flow, 
OutputStreamCallback *callback
                        {
                                uint64_t appendSize = fs.tellp() - oldPos;
                                flow->_size += appendSize;
+                               /*
                                _logger->log_debug("Append offset %d extra 
length %d to new size %d into content %s for FlowFile UUID %s",
-                                               flow->_offset, appendSize, 
flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str());
+                                               flow->_offset, appendSize, 
flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); 
*/
                                fs.close();
                        }
                        else
@@ -321,8 +323,9 @@ void ProcessSession::read(FlowFileRecord *flow, 
InputStreamCallback *callback)
                        if (fs.good())
                        {
                                callback->process(&fs);
+                               /*
                                _logger->log_debug("Read offset %d size %d 
content %s for FlowFile UUID %s",
-                                               flow->_offset, flow->_size, 
claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str());
+                                               flow->_offset, flow->_size, 
claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
                                fs.close();
                        }
                        else
@@ -348,6 +351,94 @@ void ProcessSession::read(FlowFileRecord *flow, 
InputStreamCallback *callback)
        }
 }
 
+void ProcessSession::import(std::string source, FlowFileRecord *flow)
+{
+       ResourceClaim *claim = NULL;
+
+       claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY);
+       char *buf = NULL;
+       int size = 4096;
+       buf = new char [size];
+
+       try
+       {
+               std::ofstream fs;
+               fs.open(claim->getContentFullPath().c_str(), std::fstream::out 
| std::fstream::binary | std::fstream::trunc);
+               std::ifstream input;
+               input.open(source.c_str(), std::fstream::in | 
std::fstream::binary);
+
+               if (fs.is_open() && input.is_open())
+               {
+                       // Open the source file and stream to the flow file
+                       while (input.good())
+                       {
+                               input.read(buf, size);
+                               if (input)
+                                       fs.write(buf, size);
+                               else
+                                       fs.write(buf, input.gcount());
+                       }
+
+                       if (fs.good() && fs.tellp() >= 0)
+                       {
+                               flow->_size = fs.tellp();
+                               flow->_offset = 0;
+                               if (flow->_claim)
+                               {
+                                       // Remove the old claim
+                                       
flow->_claim->decreaseFlowFileRecordOwnedCount();
+                                       flow->_claim = NULL;
+                               }
+                               flow->_claim = claim;
+                               claim->increaseFlowFileRecordOwnedCount();
+                               /*
+                               _logger->log_debug("Import offset %d length %d 
into content %s for FlowFile UUID %s",
+                                               flow->_offset, flow->_size, 
flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */
+                               fs.close();
+                               input.close();
+                       }
+                       else
+                       {
+                               fs.close();
+                               input.close();
+                               throw Exception(FILE_OPERATION_EXCEPTION, "File 
Import Error");
+                       }
+               }
+               else
+               {
+                       throw Exception(FILE_OPERATION_EXCEPTION, "File Import 
Error");
+               }
+
+               delete[] buf;
+       }
+       catch (std::exception &exception)
+       {
+               if (flow && flow->_claim == claim)
+               {
+                       flow->_claim->decreaseFlowFileRecordOwnedCount();
+                       flow->_claim = NULL;
+               }
+               if (claim)
+                       delete claim;
+               _logger->log_debug("Caught Exception %s", exception.what());
+               delete[] buf;
+               throw;
+       }
+       catch (...)
+       {
+               if (flow && flow->_claim == claim)
+               {
+                       flow->_claim->decreaseFlowFileRecordOwnedCount();
+                       flow->_claim = NULL;
+               }
+               if (claim)
+                       delete claim;
+               _logger->log_debug("Caught Exception during process session 
write");
+               delete[] buf;
+               throw;
+       }
+}
+
 void ProcessSession::commit()
 {
        try

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/Processor.cpp
----------------------------------------------------------------------
diff --git a/src/Processor.cpp b/src/Processor.cpp
index 5c695af..d5e1a6a 100644
--- a/src/Processor.cpp
+++ b/src/Processor.cpp
@@ -40,7 +40,7 @@ Processor::Processor(std::string name, uuid_t uuid)
                uuid_copy(_uuid, uuid);
 
        char uuidStr[37];
-       uuid_parse(uuidStr, _uuid);
+       uuid_unparse(_uuid, uuidStr);
        _uuidStr = uuidStr;
 
        // Setup the default values
@@ -58,7 +58,7 @@ Processor::Processor(std::string name, uuid_t uuid)
        _incomingConnectionsIter = this->_incomingConnections.begin();
        _logger = Logger::getLogger();
 
-       _logger->log_info("Processor %s created", _name.c_str());
+       _logger->log_info("Processor %s created UUID %s", _name.c_str(), 
_uuidStr.c_str());
 }
 
 Processor::~Processor()
@@ -208,13 +208,6 @@ bool Processor::getProperty(std::string name, std::string 
&value)
 
 bool Processor::setProperty(std::string name, std::string value)
 {
-       if (isRunning())
-       {
-               _logger->log_info("Can not set processor property while the 
process %s is running",
-                               _name.c_str());
-               return false;
-       }
-
        std::lock_guard<std::mutex> lock(_mtx);
        std::map<std::string, Property>::iterator it = _properties.find(name);
        if (it != _properties.end())

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/SchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/src/SchedulingAgent.cpp b/src/SchedulingAgent.cpp
index 45f5d10..211c328 100644
--- a/src/SchedulingAgent.cpp
+++ b/src/SchedulingAgent.cpp
@@ -60,6 +60,7 @@ bool SchedulingAgent::onTrigger(Processor *processor)
        try
        {
                processor->onTrigger();
+               processor->decrementActiveTask();
        }
        catch (Exception &exception)
        {

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/TimerDrivenSchedulingAgent.cpp
----------------------------------------------------------------------
diff --git a/src/TimerDrivenSchedulingAgent.cpp 
b/src/TimerDrivenSchedulingAgent.cpp
index a5f8f3c..3ce57ae 100644
--- a/src/TimerDrivenSchedulingAgent.cpp
+++ b/src/TimerDrivenSchedulingAgent.cpp
@@ -74,7 +74,7 @@ void TimerDrivenSchedulingAgent::schedule(Processor 
*processor)
                _logger->log_info("Scheduled Time Driven thread %d running for 
process %s", thread->get_id(),
                                processor->getName().c_str());
        }
-       _threads[processor->getName().c_str()] = threads;
+       _threads[processor->getUUIDStr().c_str()] = threads;
 
        return;
 }
@@ -105,6 +105,7 @@ void TimerDrivenSchedulingAgent::unschedule(Processor 
*processor)
                delete thread;
        }
        _threads.erase(processor->getUUIDStr());
+       processor->clearActiveTask();
 
        return;
 }

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/target/conf/nifi.properties
----------------------------------------------------------------------
diff --git a/target/conf/nifi.properties b/target/conf/nifi.properties
index c4f7dff..b1902b1 100644
--- a/target/conf/nifi.properties
+++ b/target/conf/nifi.properties
@@ -183,3 +183,9 @@ nifi.cluster.manager.safemode.duration=0 sec
 
 # kerberos #
 nifi.kerberos.krb5.file=
+
+# Server
+nifi.server.name=localhost
+nifi.server.port=9000
+nifi.server.report.interval=1000 ms
+

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/test/Server.cpp
----------------------------------------------------------------------
diff --git a/test/Server.cpp b/test/Server.cpp
new file mode 100644
index 0000000..e7b3452
--- /dev/null
+++ b/test/Server.cpp
@@ -0,0 +1,607 @@
+/* A simple server in the internet domain using TCP
+   The port number is passed as an argument */
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <sys/types.h> 
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <errno.h>
+#include <arpa/inet.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <string>
+#include <errno.h>
+#include <chrono>
+#include <thread>
+#include <iostream>     // std::cout
+#include <fstream>      // std::ifstream
+#include <signal.h>
+
+#define DEFAULT_NIFI_SERVER_PORT 9000
+#define DEFAULT_REPORT_INTERVAL 1000 // 1 sec
+#define MAX_READ_TIMEOUT 30000 // 30 seconds
+
+//! FlowControl Protocol Msg Type
+typedef enum {
+       REGISTER_REQ, // Device Register Request from device to server which 
contain device serial number, current running flow xml version
+       REGISTER_RESP, // Device Register Respond from server to device, may 
contain new flow.xml from server ask device to apply and also device report 
interval
+       REPORT_REQ, // Period Device Report from device to server which contain 
device serial number, current running flow xml name/version and other period 
report info
+       REPORT_RESP, // Report Respond from server to device, may ask device to 
update flow xml or processor property
+       MAX_FLOW_CONTROL_MSG_TYPE
+} FlowControlMsgType;
+
+//! FlowControl Protocol Msg Type String
+static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] =
+{
+               "REGISTER_REQ",
+               "REGISTER_RESP",
+               "REPORT_REQ",
+               "REPORT_RESP"
+};
+
+//! Flow Control Msg Type to String
+inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type)
+{
+       if (type < MAX_FLOW_CONTROL_MSG_TYPE)
+               return FlowControlMsgTypeStr[type];
+       else
+               return NULL;
+}
+
+//! FlowControll Protocol Msg ID (Some Messages are fix length, Some are 
variable length (TLV)
+typedef enum {
+       //Fix length 8 bytes: client to server in register request, required 
field
+       FLOW_SERIAL_NUMBER,
+       // Flow XML name TLV: client to server in register request and report 
request, required field
+       FLOW_XML_NAME,
+       // Flow XML content, TLV: server to client in register respond, option 
field in case server want to ask client to load xml from server
+       FLOW_XML_CONTENT,
+       // Fix length, 4 bytes Report interval in msec: server to client in 
register respond, option field
+       REPORT_INTERVAL,
+       // Processor Name TLV:  server to client in report respond, option 
field in case server want to ask client to update processor property
+       PROCESSOR_NAME,
+       // Processor Property Name TLV: server to client in report respond, 
option field in case server want to ask client to update processor property
+       PROPERTY_NAME,
+       // Processor Property Value TLV: server to client in report respond, 
option field in case server want to ask client to update processor property
+       PROPERTY_VALUE,
+       // Report Blob TLV: client to server in report request, option field in 
case client want to pickyback the report blob in report request to server
+       REPORT_BLOB,
+       MAX_FLOW_MSG_ID
+} FlowControlMsgID;
+
+//! FlowControl Protocol Msg ID String
+static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] =
+{
+               "FLOW_SERIAL_NUMBER",
+               "FLOW_XML_NAME",
+               "FLOW_XML_CONTENT",
+               "REPORT_INTERVAL",
+               "PROCESSOR_NAME"
+               "PROPERTY_NAME",
+               "PROPERTY_VALUE",
+               "REPORT_BLOB"
+};
+
+#define TYPE_HDR_LEN 4 // Fix Hdr Type
+#define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes
+
+//! FlowControl Protocol Msg Len
+inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen)
+{
+       if (id == FLOW_SERIAL_NUMBER)
+               return (TYPE_HDR_LEN + 8);
+       else if (id == REPORT_INTERVAL)
+               return (TYPE_HDR_LEN + 4);
+       else if (id < MAX_FLOW_MSG_ID)
+               return (TLV_HDR_LEN + payLoadLen);
+       else
+               return -1;
+}
+
+//! Flow Control Msg Id to String
+inline const char *FlowControlMsgIdToStr(FlowControlMsgID id)
+{
+       if (id < MAX_FLOW_MSG_ID)
+               return FlowControlMsgIDStr[id];
+       else
+               return NULL;
+}
+
+//! Flow Control Respond status code
+typedef enum {
+       RESP_SUCCESS,
+       RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger 
register
+       RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow 
controller
+       RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow 
controller
+       RESP_FAILURE,
+       MAX_RESP_CODE
+} FlowControlRespCode;
+
+//! FlowControl Resp Code str
+static const char *FlowControlRespCodeStr[MAX_RESP_CODE] =
+{
+               "RESP_SUCCESS",
+               "RESP_TRIGGER_REGISTER",
+               "RESP_START_FLOW_CONTROLLER",
+               "RESP_STOP_FLOW_CONTROLLER",
+               "RESP_FAILURE"
+};
+
+//! Flow Control Resp Code to String
+inline const char *FlowControlRespCodeToStr(FlowControlRespCode code)
+{
+       if (code < MAX_RESP_CODE)
+               return FlowControlRespCodeStr[code];
+       else
+               return NULL;
+}
+
+//! Common FlowControlProtocol Header
+typedef struct {
+       uint32_t msgType; //! Msg Type
+       uint32_t seqNumber; //! Seq Number to match Req with Resp
+       uint32_t status; //! Resp Code, see FlowControlRespCode
+       uint32_t payloadLen; //! Msg Payload length
+} FlowControlProtocolHeader;
+
+
+//! encode uint32_t
+uint8_t *encode(uint8_t *buf, uint32_t value)
+{
+               *buf++ = (value & 0xFF000000) >> 24;
+               *buf++ = (value & 0x00FF0000) >> 16;
+               *buf++ = (value & 0x0000FF00) >> 8;
+               *buf++ = (value & 0x000000FF);
+               return buf;
+}
+
+//! encode uint32_t
+uint8_t *decode(uint8_t *buf, uint32_t &value)
+{
+               value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3]));
+               return (buf + 4);
+}
+
+//! encode byte array
+uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size)
+{
+               memcpy(buf, bufArray, size);
+               buf += size;
+               return buf;
+}
+
+//! encode std::string
+uint8_t *encode(uint8_t *buf, std::string value)
+{
+               // add the \0 for size
+               buf = encode(buf, value.size()+1);
+               buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1);
+               return buf;
+}
+
+int sendData(int socket, uint8_t *buf, int buflen)
+{
+       int ret = 0, bytes = 0;
+
+       while (bytes < buflen)
+       {
+               ret = send(socket, buf+bytes, buflen-bytes, 0);
+               //check for errors
+               if (ret == -1)
+               {
+                       return ret;
+               }
+               bytes+=ret;
+       }
+
+       return bytes;
+}
+
+void error(const char *msg)
+{
+    perror(msg);
+    exit(1);
+}
+
+/* readline - read a '\n' terminated line from socket fd 
+              into buffer bufptr of size len. The line in the
+              buffer is terminated with '\0'.
+              It returns -1 in case of error or if
+              the capacity of the buffer is exceeded.
+             It returns 0 if EOF is encountered before reading '\n'.
+ */
+int readline( int fd, char *bufptr, size_t len )
+{
+  /* Note that this function is very tricky.  It uses the
+     static variables bp, cnt, and b to establish a local buffer.
+     The recv call requests large chunks of data (the size of the buffer).
+     Then if the recv call reads more than one line, the overflow
+     remains in the buffer and it is made available to the next call
+     to readline. 
+     Notice also that this routine reads up to '\n' and overwrites
+     it with '\0'. Thus if the line is really terminated with
+     "\r\n", the '\r' will remain unchanged.
+  */
+  char *bufx = bufptr;
+  static char *bp;
+  static int cnt = 0;
+  static char b[ 4096 ];
+  char c;
+  
+  while ( --len > 0 )
+    {
+      if ( --cnt <= 0 )
+       {
+         cnt = recv( fd, b, sizeof( b ), 0 );
+         if ( cnt < 0 )
+           {
+             if ( errno == EINTR )
+               {
+                 len++;                /* the while will decrement */
+                 continue;
+               }
+             return -1;
+           }
+         if ( cnt == 0 )
+           return 0;
+         bp = b;
+       }
+      c = *bp++;
+      *bufptr++ = c;
+      if ( c == '\n' )
+       {
+         *bufptr = '\0';
+         return bufptr - bufx;
+       }
+    }
+  return -1;
+}
+
+int readData(int socket, uint8_t *buf, int buflen)
+{
+       int sendSize = buflen;
+       int status;
+
+       while (buflen)
+       {
+#ifndef __MACH__
+               status = read(socket, buf, buflen);
+#else
+               status = recv(socket, buf, buflen, 0);
+#endif
+               if (status <= 0)
+               {
+                       return status;
+               }
+               buflen -= status;
+               buf += status;
+       }
+
+       return sendSize;
+}
+
+int readHdr(int socket, FlowControlProtocolHeader *hdr)
+{
+       uint8_t buffer[sizeof(FlowControlProtocolHeader)];
+
+       uint8_t *data = buffer;
+
+       int status = readData(socket, buffer, 
sizeof(FlowControlProtocolHeader));
+       if (status <= 0)
+               return status;
+
+       uint32_t value;
+       data = decode(data, value);
+       hdr->msgType = value;
+
+       data = decode(data, value);
+       hdr->seqNumber = value;
+
+       data = decode(data, value);
+       hdr->status = value;
+
+       data = decode(data, value);
+       hdr->payloadLen = value;
+
+       return sizeof(FlowControlProtocolHeader);
+}
+
+int readXML(char **xmlContent)
+{
+         std::ifstream is ("conf/flowServer.xml", std::ifstream::binary);
+         if (is) {
+           // get length of file:
+           is.seekg (0, is.end);
+           int length = is.tellg();
+           is.seekg (0, is.beg);
+
+           char * buffer = new char [length];
+
+           printf("Reading %s len %d\n", "conf/flowServer.xml", length);
+           // read data as a block:
+           is.read (buffer,length);
+
+           is.close();
+
+           // ...buffer contains the entire file...
+           *xmlContent = buffer;
+
+           return length;
+         }
+         return 0;
+}
+
+static int sockfd = 0, newsockfd = 0;
+void sigHandler(int signal)
+{
+       if (signal == SIGINT || signal == SIGTERM)
+       {
+               close(newsockfd);
+               close(sockfd);
+               exit(1);
+       }
+}
+
+int main(int argc, char *argv[])
+{
+     int portno;
+     socklen_t clilen;
+     struct sockaddr_in serv_addr, cli_addr;
+     char buffer[4096];
+     int flag = 0;
+     int number = 0;
+     
+     int n;
+     if (argc < 2) {
+         fprintf(stderr,"ERROR, no port provided\n");
+         exit(1);
+     }
+
+        if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, 
sigHandler) == SIG_ERR)
+        {
+
+               return -1;
+        }
+     sockfd = socket(AF_INET, SOCK_STREAM, 0);
+     if (sockfd < 0) 
+        error("ERROR opening socket");
+     bzero((char *) &serv_addr, sizeof(serv_addr));
+     portno = atoi(argv[1]);
+     serv_addr.sin_family = AF_INET;
+     serv_addr.sin_addr.s_addr = INADDR_ANY;
+     serv_addr.sin_port = htons(portno);
+     if (bind(sockfd, (struct sockaddr *) &serv_addr,
+              sizeof(serv_addr)) < 0) 
+              error("ERROR on binding");
+     listen(sockfd,5);
+     if (portno == DEFAULT_NIFI_SERVER_PORT)
+     {
+        while (true)
+        {
+                clilen = sizeof(cli_addr);
+                newsockfd = accept(sockfd,
+                 (struct sockaddr *) &cli_addr, 
+                 &clilen);
+                if (newsockfd < 0)
+                {
+                        error("ERROR on accept");
+                        break;
+                }
+                // process request
+                FlowControlProtocolHeader hdr;
+                int status = readHdr(newsockfd, &hdr);
+                if (status > 0)
+                {
+                        printf("Flow Control Protocol receive MsgType %s\n", 
FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType));
+                    printf("Flow Control Protocol receive Seq Num %d\n", 
hdr.seqNumber);
+                    printf("Flow Control Protocol receive Resp Code %s\n", 
FlowControlRespCodeToStr((FlowControlRespCode) hdr.status));
+                    printf("Flow Control Protocol receive Payload len %d\n", 
hdr.payloadLen);
+                        if (((FlowControlMsgType) hdr.msgType) == REGISTER_REQ)
+                        {
+                               printf("Flow Control Protocol Register Req 
receive\n");
+                               uint8_t *payload = new uint8_t[hdr.payloadLen];
+                               uint8_t *payloadPtr = payload;
+                               status = readData(newsockfd, payload, 
hdr.payloadLen);
+                               while (status > 0 && payloadPtr < (payload + 
hdr.payloadLen))
+                               {
+                                       uint32_t msgID = 0xFFFFFFFF;
+                                       payloadPtr = decode(payloadPtr, msgID);
+                                       if (((FlowControlMsgID) msgID) == 
FLOW_SERIAL_NUMBER)
+                                       {
+                                               // Fixed 8 bytes
+                                               uint8_t seqNum[8];
+                                               memcpy(seqNum, payloadPtr, 8);
+                                               printf("Flow Control Protocol 
Register Req receive serial num\n");
+                                               payloadPtr += 8;
+                                       }
+                                       else if (((FlowControlMsgID) msgID) == 
FLOW_XML_NAME)
+                                       {
+                                               uint32_t len;
+                                               payloadPtr = decode(payloadPtr, 
len);
+                                               printf("Flow Control Protocol 
receive XML name length %d\n", len);
+                                               std::string flowName = (const 
char *) payloadPtr;
+                                               payloadPtr += len;
+                                               printf("Flow Control Protocol 
receive XML name %s\n", flowName.c_str());
+                                       }
+                                       else
+                                       {
+                                               break;
+                                       }
+                               }
+                               delete[] payload;
+                               // Send Register Respond
+                               // Calculate the total payload msg size
+                               char *xmlContent;
+                               uint32_t xmlLen = readXML(&xmlContent);
+                               uint32_t payloadSize = 
FlowControlMsgIDEncodingLen(REPORT_INTERVAL, 0);
+                               if (xmlLen > 0)
+                                       payloadSize += 
FlowControlMsgIDEncodingLen(FLOW_XML_CONTENT, xmlLen);
+
+                               uint32_t size = 
sizeof(FlowControlProtocolHeader) + payloadSize;
+                               uint8_t *data = new uint8_t[size];
+                               uint8_t *start = data;
+
+                               // encode the HDR
+                               hdr.msgType = REGISTER_RESP;
+                               hdr.payloadLen = payloadSize;
+                               hdr.status = RESP_SUCCESS;
+                               data = encode(data, hdr.msgType);
+                               data = encode(data, hdr.seqNumber);
+                               data = encode(data, hdr.status);
+                               data = encode(data, hdr.payloadLen);
+
+                               // encode the report interval
+                               data = encode(data, REPORT_INTERVAL);
+                               data = encode(data, DEFAULT_REPORT_INTERVAL);
+
+                               // encode the XML content
+                               if (xmlLen > 0)
+                               {
+                                       data = encode(data, FLOW_XML_CONTENT);
+                                       data = encode(data, xmlLen);
+                                       data = encode(data, (uint8_t *) 
xmlContent, xmlLen);
+                                       delete[] xmlContent;
+                               }
+
+                               // send it
+                               status = sendData(newsockfd, start, size);
+                               delete[] start;
+                        }
+                        else if (((FlowControlMsgType) hdr.msgType) == 
REPORT_REQ)
+                        {
+                                       printf("Flow Control Protocol Report 
Req receive\n");
+                                       uint8_t *payload = new 
uint8_t[hdr.payloadLen];
+                                       uint8_t *payloadPtr = payload;
+                                       status = readData(newsockfd, payload, 
hdr.payloadLen);
+                                       while (status > 0 && payloadPtr < 
(payload + hdr.payloadLen))
+                                       {
+                                               uint32_t msgID = 0xFFFFFFFF;
+                                               payloadPtr = decode(payloadPtr, 
msgID);
+                                               if (((FlowControlMsgID) msgID) 
== FLOW_XML_NAME)
+                                               {
+                                                       uint32_t len;
+                                                       payloadPtr = 
decode(payloadPtr, len);
+                                                       printf("Flow Control 
Protocol receive XML name length %d\n", len);
+                                                       std::string flowName = 
(const char *) payloadPtr;
+                                                       payloadPtr += len;
+                                                       printf("Flow Control 
Protocol receive XML name %s\n", flowName.c_str());
+                                               }
+                                               else
+                                               {
+                                                       break;
+                                               }
+                                       }
+                                       delete[] payload;
+                                       // Send Register Respond
+                                       // Calculate the total payload msg size
+                                       std::string processor = 
"RealTimeDataCollector";
+                                       std::string propertyName1 = "real Time 
Message ID";
+                                       std::string propertyValue1 = "41";
+                                       std::string propertyName2 = "Batch 
Message ID";
+                                       std::string propertyValue2 = 
"172,30,48";
+                                       if (flag == 0)
+                                       {
+                                       propertyName1 = "Real Time Message ID";
+                                       propertyValue1 = "41";
+                                   propertyName2 = "Batch Message ID";
+                                   propertyValue2 = "172,48";
+                                               flag = 1;
+                                       }
+                                       else if (flag == 1)
+                                       {
+                                               propertyName1 = "Real Time 
Message ID";
+                                               propertyValue1 = "172,48";
+                                               propertyName2 = "Batch Message 
ID";
+                                               propertyValue2 = "41";
+                                               flag = 0;
+                                       }
+                                       uint32_t payloadSize = 
FlowControlMsgIDEncodingLen(PROCESSOR_NAME, processor.size()+1);
+                                       payloadSize += 
FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName1.size()+1);
+                                       payloadSize += 
FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue1.size()+1);
+                                       payloadSize += 
FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName2.size()+1);
+                                       payloadSize += 
FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue2.size()+1);
+
+                                       uint32_t size = 
sizeof(FlowControlProtocolHeader) + payloadSize;
+                                       uint8_t *data = new uint8_t[size];
+                                       uint8_t *start = data;
+
+                                       // encode the HDR
+                                       hdr.msgType = REPORT_RESP;
+                                       hdr.payloadLen = payloadSize;
+                                       hdr.status = RESP_SUCCESS;
+
+                                       if (number >= 10 && number < 20)
+                                   {
+                                       // After 10 second report, stop the 
flow controller for 10 second
+                                      hdr.status = RESP_STOP_FLOW_CONTROLLER;
+                                   }
+                                       else if (number == 20)
+                                       {
+                                               // restart the flow controller 
after 10 second
+                                               hdr.status = 
RESP_START_FLOW_CONTROLLER;
+                                       }
+                                       else if (number == 30)
+                                       {
+                                               // retrigger register
+                                               hdr.status = 
RESP_TRIGGER_REGISTER;
+                                               number = 0;
+                                       }
+
+                                   number++;
+
+                                       data = encode(data, hdr.msgType);
+                                       data = encode(data, hdr.seqNumber);
+                                       data = encode(data, hdr.status);
+                                       data = encode(data, hdr.payloadLen);
+
+                                       // encode the processorName
+                                       data = encode(data, PROCESSOR_NAME);
+                                       data = encode(data, processor);
+
+                                       // encode the propertyName and value TLV
+                                       data = encode(data, PROPERTY_NAME);
+                               data = encode(data, propertyName1);
+                               data = encode(data, PROPERTY_VALUE);
+                               data = encode(data, propertyValue1);
+                               data = encode(data, PROPERTY_NAME);
+                               data = encode(data, propertyName2);
+                               data = encode(data, PROPERTY_VALUE);
+                               data = encode(data, propertyValue2);
+                                       // send it
+                                       status = sendData(newsockfd, start, 
size);
+                                       delete[] start;
+                                }
+                }
+                close(newsockfd);
+        }
+        close(sockfd);
+     }
+     else
+     {
+        clilen = sizeof(cli_addr);
+        newsockfd = accept(sockfd,
+                       (struct sockaddr *) &cli_addr,
+                        &clilen);
+        if (newsockfd < 0)
+               error("ERROR on accept");
+        while (1)
+        {
+               bzero(buffer,4096);
+               n = readline(newsockfd,buffer,4095);
+               if (n <= 0 )
+               {
+                       close(newsockfd);
+                       newsockfd = accept(sockfd,
+                                               (struct sockaddr *) &cli_addr,
+                                                &clilen);
+                       continue;
+               }
+               printf("%s",buffer);
+         }
+         close(newsockfd);
+         close(sockfd);
+     }
+     return 0; 
+}

http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/thirdparty/libxml2/AUTHORS
----------------------------------------------------------------------
diff --git a/thirdparty/libxml2/AUTHORS b/thirdparty/libxml2/AUTHORS
new file mode 100644
index 0000000..cf2e9a6
--- /dev/null
+++ b/thirdparty/libxml2/AUTHORS
@@ -0,0 +1,5 @@
+Daniel Veillard <dan...@veillard.com>
+Bjorn Reese <bre...@users.sourceforge.net>
+William Brack <wbr...@mmm.com.hk>
+Igor Zlatkovic <i...@zlatkovic.com> for the Windows port
+Aleksey Sanin <alek...@aleksey.com>

Reply via email to