http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/FlowControlProtocol.cpp ---------------------------------------------------------------------- diff --git a/src/FlowControlProtocol.cpp b/src/FlowControlProtocol.cpp new file mode 100644 index 0000000..6aaa969 --- /dev/null +++ b/src/FlowControlProtocol.cpp @@ -0,0 +1,540 @@ +/** + * @file FlowControlProtocol.cpp + * FlowControlProtocol class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <sys/time.h> +#include <stdio.h> +#include <time.h> +#include <chrono> +#include <thread> +#include <random> +#include <netinet/tcp.h> +#include <iostream> +#include "FlowController.h" +#include "FlowControlProtocol.h" + +int FlowControlProtocol::connectServer(const char *host, uint16_t port) +{ + in_addr_t addr; + int sock = 0; + struct hostent *h; +#ifdef __MACH__ + h = gethostbyname(host); +#else + char buf[1024]; + struct hostent he; + int hh_errno; + gethostbyname_r(host, &he, buf, sizeof(buf), &h, &hh_errno); +#endif + memcpy((char *) &addr, h->h_addr_list[0], h->h_length); + sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) + { + _logger->log_error("Could not create socket to hostName %s", host); + return 0; + } + +#ifndef __MACH__ + int opt = 1; + bool nagle_off = true; + + if (nagle_off) + { + if (setsockopt(sock, SOL_TCP, TCP_NODELAY, (void *)&opt, sizeof(opt)) < 0) + { + _logger->log_error("setsockopt() TCP_NODELAY failed"); + close(sock); + return 0; + } + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, + (char *)&opt, sizeof(opt)) < 0) + { + _logger->log_error("setsockopt() SO_REUSEADDR failed"); + close(sock); + return 0; + } + } + + int sndsize = 256*1024; + if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF, (char *)&sndsize, (int)sizeof(sndsize)) < 0) + { + _logger->log_error("setsockopt() SO_SNDBUF failed"); + close(sock); + return 0; + } +#endif + + struct sockaddr_in sa; + socklen_t socklen; + int status; + + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = htonl(INADDR_ANY); + sa.sin_port = htons(0); + socklen = sizeof(sa); + if (bind(sock, (struct sockaddr *)&sa, socklen) < 0) + { + _logger->log_error("socket bind failed"); + close(sock); + return 0; + } + + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = addr; + sa.sin_port = htons(port); + socklen = sizeof(sa); + + status = connect(sock, (struct sockaddr *)&sa, socklen); + + if (status < 0) + { + _logger->log_error("socket connect failed to %s %d", host, port); + close(sock); + return 0; + } + + _logger->log_info("Flow Control Protocol socket %d connect to server %s port %d success", sock, host, port); + + return sock; +} + +int FlowControlProtocol::sendData(uint8_t *buf, int buflen) +{ + int ret = 0, bytes = 0; + + while (bytes < buflen) + { + ret = send(_socket, buf+bytes, buflen-bytes, 0); + //check for errors + if (ret == -1) + { + return ret; + } + bytes+=ret; + } + + return bytes; +} + +int FlowControlProtocol::selectClient(int msec) +{ + fd_set fds; + struct timeval tv; + int retval; + int fd = _socket; + + FD_ZERO(&fds); + FD_SET(fd, &fds); + + tv.tv_sec = msec/1000; + tv.tv_usec = (msec % 1000) * 1000; + + if (msec > 0) + retval = select(fd+1, &fds, NULL, NULL, &tv); + else + retval = select(fd+1, &fds, NULL, NULL, NULL); + + if (retval <= 0) + return retval; + if (FD_ISSET(fd, &fds)) + return retval; + else + return 0; +} + +int FlowControlProtocol::readData(uint8_t *buf, int buflen) +{ + int sendSize = buflen; + + while (buflen) + { + int status; + status = selectClient(MAX_READ_TIMEOUT); + if (status <= 0) + { + return status; + } +#ifndef __MACH__ + status = read(_socket, buf, buflen); +#else + status = recv(_socket, buf, buflen, 0); +#endif + if (status <= 0) + { + return status; + } + buflen -= status; + buf += status; + } + + return sendSize; +} + +int FlowControlProtocol::readHdr(FlowControlProtocolHeader *hdr) +{ + uint8_t buffer[sizeof(FlowControlProtocolHeader)]; + + uint8_t *data = buffer; + + int status = readData(buffer, sizeof(FlowControlProtocolHeader)); + if (status <= 0) + return status; + + uint32_t value; + data = this->decode(data, value); + hdr->msgType = value; + + data = this->decode(data, value); + hdr->seqNumber = value; + + data = this->decode(data, value); + hdr->status = value; + + data = this->decode(data, value); + hdr->payloadLen = value; + + return sizeof(FlowControlProtocolHeader); +} + +void FlowControlProtocol::start() +{ + if (_running) + return; + _running = true; + _logger->log_info("FlowControl Protocol Start"); + _thread = new std::thread(run, this); + _thread->detach(); +} + +void FlowControlProtocol::stop() +{ + if (!_running) + return; + _running = false; + _logger->log_info("FlowControl Protocol Stop"); + delete _thread; +} + +void FlowControlProtocol::run(FlowControlProtocol *protocol) +{ + while (protocol->_running) + { + std::this_thread::sleep_for(std::chrono::milliseconds(protocol->_reportInterval)); + if (!protocol->_registered) + { + // if it is not register yet + protocol->sendRegisterReq(); + // protocol->_controller->reload("flow.xml"); + } + else + protocol->sendReportReq(); + } + return; +} + +int FlowControlProtocol::sendRegisterReq() +{ + if (_registered) + { + _logger->log_info("Already registered"); + return -1; + } + + uint16_t port = this->_serverPort; + + if (this->_socket <= 0) + this->_socket = connectServer(_serverName.c_str(), port); + + if (this->_socket <= 0) + return -1; + + // Calculate the total payload msg size + uint32_t payloadSize = FlowControlMsgIDEncodingLen(FLOW_SERIAL_NUMBER, 0) + + FlowControlMsgIDEncodingLen(FLOW_XML_NAME, this->_controller->getName().size()+1); + uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; + + uint8_t *data = new uint8_t[size]; + uint8_t *start = data; + + // encode the HDR + FlowControlProtocolHeader hdr; + hdr.msgType = REGISTER_REQ; + hdr.payloadLen = payloadSize; + hdr.seqNumber = this->_seqNumber; + hdr.status = RESP_SUCCESS; + data = this->encode(data, hdr.msgType); + data = this->encode(data, hdr.seqNumber); + data = this->encode(data, hdr.status); + data = this->encode(data, hdr.payloadLen); + + // encode the serial number + data = this->encode(data, FLOW_SERIAL_NUMBER); + data = this->encode(data, this->_serialNumber, 8); + + // encode the XML name + data = this->encode(data, FLOW_XML_NAME); + data = this->encode(data, this->_controller->getName()); + + // send it + int status = sendData(start, size); + delete[] start; + if (status <= 0) + { + close(_socket); + _socket = 0; + _logger->log_error("Flow Control Protocol Send Register Req failed"); + return -1; + } + + // Looking for register respond + status = readHdr(&hdr); + + if (status <= 0) + { + close(_socket); + _socket = 0; + _logger->log_error("Flow Control Protocol Read Register Resp header failed"); + return -1; + } + _logger->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); + _logger->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber); + _logger->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); + _logger->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen); + + if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) + { + this->_registered = true; + this->_seqNumber++; + _logger->log_info("Flow Control Protocol Register success"); + uint8_t *payload = new uint8_t[hdr.payloadLen]; + uint8_t *payloadPtr = payload; + status = readData(payload, hdr.payloadLen); + if (status <= 0) + { + delete[] payload; + _logger->log_info("Flow Control Protocol Register Read Payload fail"); + close(_socket); + _socket = 0; + return -1; + } + while (payloadPtr < (payload + hdr.payloadLen)) + { + uint32_t msgID; + payloadPtr = this->decode(payloadPtr, msgID); + if (((FlowControlMsgID) msgID) == REPORT_INTERVAL) + { + // Fixed 4 bytes + uint32_t reportInterval; + payloadPtr = this->decode(payloadPtr, reportInterval); + _logger->log_info("Flow Control Protocol receive report interval %d ms", reportInterval); + this->_reportInterval = reportInterval; + } + else if (((FlowControlMsgID) msgID) == FLOW_XML_CONTENT) + { + uint32_t xmlLen; + payloadPtr = this->decode(payloadPtr, xmlLen); + _logger->log_info("Flow Control Protocol receive XML content length %d", xmlLen); + time_t rawtime; + struct tm *timeinfo; + time(&rawtime); + timeinfo = localtime(&rawtime); + std::string xmlFileName = "flow."; + xmlFileName += asctime(timeinfo); + xmlFileName += ".xml"; + std::ofstream fs; + fs.open(xmlFileName.c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); + if (fs.is_open()) + { + fs.write((const char *)payloadPtr, xmlLen); + fs.close(); + this->_controller->reload(xmlFileName.c_str()); + } + } + else + { + break; + } + } + delete[] payload; + close(_socket); + _socket = 0; + return 0; + } + else + { + _logger->log_info("Flow Control Protocol Register fail"); + close(_socket); + _socket = 0; + return -1; + } +} + + +int FlowControlProtocol::sendReportReq() +{ + uint16_t port = this->_serverPort; + + if (this->_socket <= 0) + this->_socket = connectServer(_serverName.c_str(), port); + + if (this->_socket <= 0) + return -1; + + // Calculate the total payload msg size + uint32_t payloadSize = + FlowControlMsgIDEncodingLen(FLOW_XML_NAME, this->_controller->getName().size()+1); + uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; + + uint8_t *data = new uint8_t[size]; + uint8_t *start = data; + + // encode the HDR + FlowControlProtocolHeader hdr; + hdr.msgType = REPORT_REQ; + hdr.payloadLen = payloadSize; + hdr.seqNumber = this->_seqNumber; + hdr.status = RESP_SUCCESS; + data = this->encode(data, hdr.msgType); + data = this->encode(data, hdr.seqNumber); + data = this->encode(data, hdr.status); + data = this->encode(data, hdr.payloadLen); + + // encode the XML name + data = this->encode(data, FLOW_XML_NAME); + data = this->encode(data, this->_controller->getName()); + + // send it + int status = sendData(start, size); + delete[] start; + if (status <= 0) + { + close(_socket); + _socket = 0; + _logger->log_error("Flow Control Protocol Send Report Req failed"); + return -1; + } + + // Looking for report respond + status = readHdr(&hdr); + + if (status <= 0) + { + close(_socket); + _socket = 0; + _logger->log_error("Flow Control Protocol Read Report Resp header failed"); + return -1; + } + _logger->log_info("Flow Control Protocol receive MsgType %s", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); + _logger->log_info("Flow Control Protocol receive Seq Num %d", hdr.seqNumber); + _logger->log_info("Flow Control Protocol receive Resp Code %s", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); + _logger->log_info("Flow Control Protocol receive Payload len %d", hdr.payloadLen); + + if (hdr.status == RESP_SUCCESS && hdr.seqNumber == this->_seqNumber) + { + this->_seqNumber++; + uint8_t *payload = new uint8_t[hdr.payloadLen]; + uint8_t *payloadPtr = payload; + status = readData(payload, hdr.payloadLen); + if (status <= 0) + { + delete[] payload; + _logger->log_info("Flow Control Protocol Report Resp Read Payload fail"); + close(_socket); + _socket = 0; + return -1; + } + std::string processor; + std::string propertyName; + std::string propertyValue; + while (payloadPtr < (payload + hdr.payloadLen)) + { + uint32_t msgID; + payloadPtr = this->decode(payloadPtr, msgID); + if (((FlowControlMsgID) msgID) == PROCESSOR_NAME) + { + uint32_t len; + payloadPtr = this->decode(payloadPtr, len); + processor = (const char *) payloadPtr; + payloadPtr += len; + _logger->log_info("Flow Control Protocol receive report resp processor %s", processor.c_str()); + } + else if (((FlowControlMsgID) msgID) == PROPERTY_NAME) + { + uint32_t len; + payloadPtr = this->decode(payloadPtr, len); + propertyName = (const char *) payloadPtr; + payloadPtr += len; + _logger->log_info("Flow Control Protocol receive report resp property name %s", propertyName.c_str()); + } + else if (((FlowControlMsgID) msgID) == PROPERTY_VALUE) + { + uint32_t len; + payloadPtr = this->decode(payloadPtr, len); + propertyValue = (const char *) payloadPtr; + payloadPtr += len; + _logger->log_info("Flow Control Protocol receive report resp property value %s", propertyValue.c_str()); + this->_controller->updatePropertyValue(processor, propertyName, propertyValue); + } + else + { + break; + } + } + delete[] payload; + close(_socket); + _socket = 0; + return 0; + } + else if (hdr.status == RESP_TRIGGER_REGISTER && hdr.seqNumber == this->_seqNumber) + { + _logger->log_info("Flow Control Protocol trigger reregister"); + this->_registered = false; + this->_seqNumber++; + close(_socket); + _socket = 0; + return 0; + } + else if (hdr.status == RESP_STOP_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) + { + _logger->log_info("Flow Control Protocol stop flow controller"); + this->_controller->stop(true); + this->_seqNumber++; + close(_socket); + _socket = 0; + return 0; + } + else if (hdr.status == RESP_START_FLOW_CONTROLLER && hdr.seqNumber == this->_seqNumber) + { + _logger->log_info("Flow Control Protocol start flow controller"); + this->_controller->start(); + this->_seqNumber++; + close(_socket); + _socket = 0; + return 0; + } + else + { + _logger->log_info("Flow Control Protocol Report fail"); + close(_socket); + _socket = 0; + return -1; + } +} +
http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/FlowController.cpp ---------------------------------------------------------------------- diff --git a/src/FlowController.cpp b/src/FlowController.cpp index 0525ccc..a2cafbc 100644 --- a/src/FlowController.cpp +++ b/src/FlowController.cpp @@ -44,6 +44,7 @@ FlowController::FlowController(std::string name) _initialized = false; _root = NULL; _logger = Logger::getLogger(); + _protocol = new FlowControlProtocol(this); // NiFi config properties _configure = Configure::getConfigure(); @@ -58,6 +59,7 @@ FlowController::~FlowController() { stop(true); unload(); + delete _protocol; } bool FlowController::isRunning() @@ -75,6 +77,11 @@ void FlowController::stop(bool force) if (_running) { _logger->log_info("Stop Flow Controller"); + this->_timerScheduler.stop(); + // Wait for sometime for thread stop + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + if (this->_root) + this->_root->stopProcessing(&this->_timerScheduler); _running = false; } } @@ -88,12 +95,36 @@ void FlowController::unload() if (_initialized) { _logger->log_info("Unload Flow Controller"); + if (_root) + delete _root; + _root = NULL; _initialized = false; + _name = ""; } return; } +void FlowController::reload(std::string xmlFile) +{ + _logger->log_info("Starting to reload Flow Controller with xml %s", xmlFile.c_str()); + stop(true); + unload(); + std::string oldxmlFile = this->_xmlFileName; + this->_xmlFileName = xmlFile; + load(); + start(); + if (!this->_root) + { + this->_xmlFileName = oldxmlFile; + _logger->log_info("Rollback Flow Controller to xml %s", oldxmlFile.c_str()); + stop(true); + unload(); + load(); + start(); + } +} + Processor *FlowController::createProcessor(std::string name, uuid_t uuid) { Processor *processor = NULL; @@ -291,6 +322,7 @@ void FlowController::parseRootProcessGroup(xmlDoc *doc, xmlNode *node) } // Set the root process group this->_root = group; + this->_name = name; xmlFree(name); } } @@ -504,6 +536,20 @@ void FlowController::parseProcessorNode(xmlDoc *doc, xmlNode *processorNode, Pro xmlFree(temp); } } + else if (xmlStrcmp(currentNode->name, BAD_CAST "maxConcurrentTasks") == 0) + { + char *temp = (char *) xmlNodeGetContent(currentNode); + if (temp) + { + int64_t maxConcurrentTasks; + if (Property::StringToInt(temp, maxConcurrentTasks)) + { + _logger->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks); + processor->setMaxConcurrentTasks(maxConcurrentTasks); + } + xmlFree(temp); + } + } else if (xmlStrcmp(currentNode->name, BAD_CAST "runDurationNanos") == 0) { char *temp = (char *) xmlNodeGetContent(currentNode); @@ -548,12 +594,13 @@ void FlowController::load() } if (!_initialized) { - _logger->log_info("Load Flow Controller"); + _logger->log_info("Load Flow Controller from file %s", _xmlFileName.c_str()); xmlDoc *doc = xmlReadFile(_xmlFileName.c_str(), NULL, XML_PARSE_NONET); if (doc == NULL) { _logger->log_error("xmlReadFile returned NULL when reading [%s]", _xmlFileName.c_str()); + _initialized = true; return; } @@ -637,6 +684,7 @@ bool FlowController::start() if (this->_root) this->_root->startProcessing(&this->_timerScheduler); _running = true; + this->_protocol->start(); } return true; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/FlowFileRecord.cpp ---------------------------------------------------------------------- diff --git a/src/FlowFileRecord.cpp b/src/FlowFileRecord.cpp index 601dcde..2dda47a 100644 --- a/src/FlowFileRecord.cpp +++ b/src/FlowFileRecord.cpp @@ -22,6 +22,9 @@ #include <map> #include <sys/time.h> #include <time.h> +#include <iostream> +#include <fstream> +#include <cstdio> #include "FlowFileRecord.h" #include "Relationship.h" @@ -83,6 +86,7 @@ FlowFileRecord::~FlowFileRecord() if (_claim->getFlowFileRecordOwnedCount() == 0) { _logger->log_debug("Delete Resource Claim %s", _claim->getContentFullPath().c_str()); + std::remove(_claim->getContentFullPath().c_str()); delete _claim; } } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/LogAttribute.cpp ---------------------------------------------------------------------- diff --git a/src/LogAttribute.cpp b/src/LogAttribute.cpp index f4548af..67ed74e 100644 --- a/src/LogAttribute.cpp +++ b/src/LogAttribute.cpp @@ -25,6 +25,7 @@ #include <time.h> #include <sstream> #include <string.h> +#include <iostream> #include "TimeUtil.h" #include "LogAttribute.h" @@ -103,6 +104,24 @@ void LogAttribute::onTrigger(ProcessContext *context, ProcessSession *session) { message << "\n" << "Content Claim:" << claim->getContentFullPath(); } + if (logPayload && flow->getSize() <= 1024*1024) + { + message << "\n" << "Payload:" << "\n"; + ReadCallback callback(flow->getSize()); + session->read(flow, &callback); + for (int i = 0, j = 0; i < callback._readSize; i++) + { + char temp[8]; + sprintf(temp, "%02x ", (unsigned char) (callback._buffer[i])); + message << temp; + j++; + if (j == 16) + { + message << '\n'; + j = 0; + } + } + } message << "\n" << dashLine << std::ends; std::string output = message.str(); @@ -127,6 +146,12 @@ void LogAttribute::onTrigger(ProcessContext *context, ProcessSession *session) break; } + // Test Import + FlowFileRecord *importRecord = session->create(); + session->import(claim->getContentFullPath(), importRecord); + session->transfer(importRecord, Success); + + // Transfer to the relationship session->transfer(flow, Success); } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/ProcessGroup.cpp ---------------------------------------------------------------------- diff --git a/src/ProcessGroup.cpp b/src/ProcessGroup.cpp index 78ae1a4..a5fd773 100644 --- a/src/ProcessGroup.cpp +++ b/src/ProcessGroup.cpp @@ -46,7 +46,24 @@ ProcessGroup::ProcessGroup(ProcessGroupType type, std::string name, uuid_t uuid, ProcessGroup::~ProcessGroup() { + for (std::set<Connection *>::iterator it = _connections.begin(); it != _connections.end(); ++it) + { + Connection *connection = *it; + connection->drain(); + delete connection; + } + + for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) + { + ProcessGroup *processGroup(*it); + delete processGroup; + } + for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it) + { + Processor *processor(*it); + delete processor; + } } bool ProcessGroup::isRootProcessGroup() @@ -198,6 +215,28 @@ Processor *ProcessGroup::findProcessor(uuid_t uuid) return ret; } +void ProcessGroup::updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) +{ + std::lock_guard<std::mutex> lock(_mtx); + + for (std::set<Processor *>::iterator it = _processors.begin(); it != _processors.end(); ++it) + { + Processor *processor(*it); + if (processor->getName() == processorName) + { + processor->setProperty(propertyName, propertyValue); + } + } + + for (std::set<ProcessGroup *>::iterator it = _childProcessGroups.begin(); it != _childProcessGroups.end(); ++it) + { + ProcessGroup *processGroup(*it); + processGroup->updatePropertyValue(processorName, propertyName, propertyValue); + } + + return; +} + void ProcessGroup::addConnection(Connection *connection) { std::lock_guard<std::mutex> lock(_mtx); http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/ProcessSession.cpp ---------------------------------------------------------------------- diff --git a/src/ProcessSession.cpp b/src/ProcessSession.cpp index cf8249d..32d8920 100644 --- a/src/ProcessSession.cpp +++ b/src/ProcessSession.cpp @@ -207,8 +207,9 @@ void ProcessSession::write(FlowFileRecord *flow, OutputStreamCallback *callback) } flow->_claim = claim; claim->increaseFlowFileRecordOwnedCount(); + /* _logger->log_debug("Write offset %d length %d into content %s for FlowFile UUID %s", - flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); + flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ fs.close(); } else @@ -273,8 +274,9 @@ void ProcessSession::append(FlowFileRecord *flow, OutputStreamCallback *callback { uint64_t appendSize = fs.tellp() - oldPos; flow->_size += appendSize; + /* _logger->log_debug("Append offset %d extra length %d to new size %d into content %s for FlowFile UUID %s", - flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); + flow->_offset, appendSize, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ fs.close(); } else @@ -321,8 +323,9 @@ void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback) if (fs.good()) { callback->process(&fs); + /* _logger->log_debug("Read offset %d size %d content %s for FlowFile UUID %s", - flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); + flow->_offset, flow->_size, claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ fs.close(); } else @@ -348,6 +351,94 @@ void ProcessSession::read(FlowFileRecord *flow, InputStreamCallback *callback) } } +void ProcessSession::import(std::string source, FlowFileRecord *flow) +{ + ResourceClaim *claim = NULL; + + claim = new ResourceClaim(DEFAULT_CONTENT_DIRECTORY); + char *buf = NULL; + int size = 4096; + buf = new char [size]; + + try + { + std::ofstream fs; + fs.open(claim->getContentFullPath().c_str(), std::fstream::out | std::fstream::binary | std::fstream::trunc); + std::ifstream input; + input.open(source.c_str(), std::fstream::in | std::fstream::binary); + + if (fs.is_open() && input.is_open()) + { + // Open the source file and stream to the flow file + while (input.good()) + { + input.read(buf, size); + if (input) + fs.write(buf, size); + else + fs.write(buf, input.gcount()); + } + + if (fs.good() && fs.tellp() >= 0) + { + flow->_size = fs.tellp(); + flow->_offset = 0; + if (flow->_claim) + { + // Remove the old claim + flow->_claim->decreaseFlowFileRecordOwnedCount(); + flow->_claim = NULL; + } + flow->_claim = claim; + claim->increaseFlowFileRecordOwnedCount(); + /* + _logger->log_debug("Import offset %d length %d into content %s for FlowFile UUID %s", + flow->_offset, flow->_size, flow->_claim->getContentFullPath().c_str(), flow->getUUIDStr().c_str()); */ + fs.close(); + input.close(); + } + else + { + fs.close(); + input.close(); + throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); + } + } + else + { + throw Exception(FILE_OPERATION_EXCEPTION, "File Import Error"); + } + + delete[] buf; + } + catch (std::exception &exception) + { + if (flow && flow->_claim == claim) + { + flow->_claim->decreaseFlowFileRecordOwnedCount(); + flow->_claim = NULL; + } + if (claim) + delete claim; + _logger->log_debug("Caught Exception %s", exception.what()); + delete[] buf; + throw; + } + catch (...) + { + if (flow && flow->_claim == claim) + { + flow->_claim->decreaseFlowFileRecordOwnedCount(); + flow->_claim = NULL; + } + if (claim) + delete claim; + _logger->log_debug("Caught Exception during process session write"); + delete[] buf; + throw; + } +} + void ProcessSession::commit() { try http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/Processor.cpp ---------------------------------------------------------------------- diff --git a/src/Processor.cpp b/src/Processor.cpp index 5c695af..d5e1a6a 100644 --- a/src/Processor.cpp +++ b/src/Processor.cpp @@ -40,7 +40,7 @@ Processor::Processor(std::string name, uuid_t uuid) uuid_copy(_uuid, uuid); char uuidStr[37]; - uuid_parse(uuidStr, _uuid); + uuid_unparse(_uuid, uuidStr); _uuidStr = uuidStr; // Setup the default values @@ -58,7 +58,7 @@ Processor::Processor(std::string name, uuid_t uuid) _incomingConnectionsIter = this->_incomingConnections.begin(); _logger = Logger::getLogger(); - _logger->log_info("Processor %s created", _name.c_str()); + _logger->log_info("Processor %s created UUID %s", _name.c_str(), _uuidStr.c_str()); } Processor::~Processor() @@ -208,13 +208,6 @@ bool Processor::getProperty(std::string name, std::string &value) bool Processor::setProperty(std::string name, std::string value) { - if (isRunning()) - { - _logger->log_info("Can not set processor property while the process %s is running", - _name.c_str()); - return false; - } - std::lock_guard<std::mutex> lock(_mtx); std::map<std::string, Property>::iterator it = _properties.find(name); if (it != _properties.end()) http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/SchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/src/SchedulingAgent.cpp b/src/SchedulingAgent.cpp index 45f5d10..211c328 100644 --- a/src/SchedulingAgent.cpp +++ b/src/SchedulingAgent.cpp @@ -60,6 +60,7 @@ bool SchedulingAgent::onTrigger(Processor *processor) try { processor->onTrigger(); + processor->decrementActiveTask(); } catch (Exception &exception) { http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/src/TimerDrivenSchedulingAgent.cpp ---------------------------------------------------------------------- diff --git a/src/TimerDrivenSchedulingAgent.cpp b/src/TimerDrivenSchedulingAgent.cpp index a5f8f3c..3ce57ae 100644 --- a/src/TimerDrivenSchedulingAgent.cpp +++ b/src/TimerDrivenSchedulingAgent.cpp @@ -74,7 +74,7 @@ void TimerDrivenSchedulingAgent::schedule(Processor *processor) _logger->log_info("Scheduled Time Driven thread %d running for process %s", thread->get_id(), processor->getName().c_str()); } - _threads[processor->getName().c_str()] = threads; + _threads[processor->getUUIDStr().c_str()] = threads; return; } @@ -105,6 +105,7 @@ void TimerDrivenSchedulingAgent::unschedule(Processor *processor) delete thread; } _threads.erase(processor->getUUIDStr()); + processor->clearActiveTask(); return; } http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/target/conf/nifi.properties ---------------------------------------------------------------------- diff --git a/target/conf/nifi.properties b/target/conf/nifi.properties index c4f7dff..b1902b1 100644 --- a/target/conf/nifi.properties +++ b/target/conf/nifi.properties @@ -183,3 +183,9 @@ nifi.cluster.manager.safemode.duration=0 sec # kerberos # nifi.kerberos.krb5.file= + +# Server +nifi.server.name=localhost +nifi.server.port=9000 +nifi.server.report.interval=1000 ms + http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/test/Server.cpp ---------------------------------------------------------------------- diff --git a/test/Server.cpp b/test/Server.cpp new file mode 100644 index 0000000..e7b3452 --- /dev/null +++ b/test/Server.cpp @@ -0,0 +1,607 @@ +/* A simple server in the internet domain using TCP + The port number is passed as an argument */ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <errno.h> +#include <arpa/inet.h> +#include <fcntl.h> +#include <netdb.h> +#include <string> +#include <errno.h> +#include <chrono> +#include <thread> +#include <iostream> // std::cout +#include <fstream> // std::ifstream +#include <signal.h> + +#define DEFAULT_NIFI_SERVER_PORT 9000 +#define DEFAULT_REPORT_INTERVAL 1000 // 1 sec +#define MAX_READ_TIMEOUT 30000 // 30 seconds + +//! FlowControl Protocol Msg Type +typedef enum { + REGISTER_REQ, // Device Register Request from device to server which contain device serial number, current running flow xml version + REGISTER_RESP, // Device Register Respond from server to device, may contain new flow.xml from server ask device to apply and also device report interval + REPORT_REQ, // Period Device Report from device to server which contain device serial number, current running flow xml name/version and other period report info + REPORT_RESP, // Report Respond from server to device, may ask device to update flow xml or processor property + MAX_FLOW_CONTROL_MSG_TYPE +} FlowControlMsgType; + +//! FlowControl Protocol Msg Type String +static const char *FlowControlMsgTypeStr[MAX_FLOW_CONTROL_MSG_TYPE] = +{ + "REGISTER_REQ", + "REGISTER_RESP", + "REPORT_REQ", + "REPORT_RESP" +}; + +//! Flow Control Msg Type to String +inline const char *FlowControlMsgTypeToStr(FlowControlMsgType type) +{ + if (type < MAX_FLOW_CONTROL_MSG_TYPE) + return FlowControlMsgTypeStr[type]; + else + return NULL; +} + +//! FlowControll Protocol Msg ID (Some Messages are fix length, Some are variable length (TLV) +typedef enum { + //Fix length 8 bytes: client to server in register request, required field + FLOW_SERIAL_NUMBER, + // Flow XML name TLV: client to server in register request and report request, required field + FLOW_XML_NAME, + // Flow XML content, TLV: server to client in register respond, option field in case server want to ask client to load xml from server + FLOW_XML_CONTENT, + // Fix length, 4 bytes Report interval in msec: server to client in register respond, option field + REPORT_INTERVAL, + // Processor Name TLV: server to client in report respond, option field in case server want to ask client to update processor property + PROCESSOR_NAME, + // Processor Property Name TLV: server to client in report respond, option field in case server want to ask client to update processor property + PROPERTY_NAME, + // Processor Property Value TLV: server to client in report respond, option field in case server want to ask client to update processor property + PROPERTY_VALUE, + // Report Blob TLV: client to server in report request, option field in case client want to pickyback the report blob in report request to server + REPORT_BLOB, + MAX_FLOW_MSG_ID +} FlowControlMsgID; + +//! FlowControl Protocol Msg ID String +static const char *FlowControlMsgIDStr[MAX_FLOW_MSG_ID] = +{ + "FLOW_SERIAL_NUMBER", + "FLOW_XML_NAME", + "FLOW_XML_CONTENT", + "REPORT_INTERVAL", + "PROCESSOR_NAME" + "PROPERTY_NAME", + "PROPERTY_VALUE", + "REPORT_BLOB" +}; + +#define TYPE_HDR_LEN 4 // Fix Hdr Type +#define TLV_HDR_LEN 8 // Type 4 bytes and Len 4 bytes + +//! FlowControl Protocol Msg Len +inline int FlowControlMsgIDEncodingLen(FlowControlMsgID id, int payLoadLen) +{ + if (id == FLOW_SERIAL_NUMBER) + return (TYPE_HDR_LEN + 8); + else if (id == REPORT_INTERVAL) + return (TYPE_HDR_LEN + 4); + else if (id < MAX_FLOW_MSG_ID) + return (TLV_HDR_LEN + payLoadLen); + else + return -1; +} + +//! Flow Control Msg Id to String +inline const char *FlowControlMsgIdToStr(FlowControlMsgID id) +{ + if (id < MAX_FLOW_MSG_ID) + return FlowControlMsgIDStr[id]; + else + return NULL; +} + +//! Flow Control Respond status code +typedef enum { + RESP_SUCCESS, + RESP_TRIGGER_REGISTER, // Server respond to client report to re trigger register + RESP_START_FLOW_CONTROLLER, // Server respond to client to start flow controller + RESP_STOP_FLOW_CONTROLLER, // Server respond to client to stop flow controller + RESP_FAILURE, + MAX_RESP_CODE +} FlowControlRespCode; + +//! FlowControl Resp Code str +static const char *FlowControlRespCodeStr[MAX_RESP_CODE] = +{ + "RESP_SUCCESS", + "RESP_TRIGGER_REGISTER", + "RESP_START_FLOW_CONTROLLER", + "RESP_STOP_FLOW_CONTROLLER", + "RESP_FAILURE" +}; + +//! Flow Control Resp Code to String +inline const char *FlowControlRespCodeToStr(FlowControlRespCode code) +{ + if (code < MAX_RESP_CODE) + return FlowControlRespCodeStr[code]; + else + return NULL; +} + +//! Common FlowControlProtocol Header +typedef struct { + uint32_t msgType; //! Msg Type + uint32_t seqNumber; //! Seq Number to match Req with Resp + uint32_t status; //! Resp Code, see FlowControlRespCode + uint32_t payloadLen; //! Msg Payload length +} FlowControlProtocolHeader; + + +//! encode uint32_t +uint8_t *encode(uint8_t *buf, uint32_t value) +{ + *buf++ = (value & 0xFF000000) >> 24; + *buf++ = (value & 0x00FF0000) >> 16; + *buf++ = (value & 0x0000FF00) >> 8; + *buf++ = (value & 0x000000FF); + return buf; +} + +//! encode uint32_t +uint8_t *decode(uint8_t *buf, uint32_t &value) +{ + value = ((buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|(buf[3])); + return (buf + 4); +} + +//! encode byte array +uint8_t *encode(uint8_t *buf, uint8_t *bufArray, int size) +{ + memcpy(buf, bufArray, size); + buf += size; + return buf; +} + +//! encode std::string +uint8_t *encode(uint8_t *buf, std::string value) +{ + // add the \0 for size + buf = encode(buf, value.size()+1); + buf = encode(buf, (uint8_t *) value.c_str(), value.size()+1); + return buf; +} + +int sendData(int socket, uint8_t *buf, int buflen) +{ + int ret = 0, bytes = 0; + + while (bytes < buflen) + { + ret = send(socket, buf+bytes, buflen-bytes, 0); + //check for errors + if (ret == -1) + { + return ret; + } + bytes+=ret; + } + + return bytes; +} + +void error(const char *msg) +{ + perror(msg); + exit(1); +} + +/* readline - read a '\n' terminated line from socket fd + into buffer bufptr of size len. The line in the + buffer is terminated with '\0'. + It returns -1 in case of error or if + the capacity of the buffer is exceeded. + It returns 0 if EOF is encountered before reading '\n'. + */ +int readline( int fd, char *bufptr, size_t len ) +{ + /* Note that this function is very tricky. It uses the + static variables bp, cnt, and b to establish a local buffer. + The recv call requests large chunks of data (the size of the buffer). + Then if the recv call reads more than one line, the overflow + remains in the buffer and it is made available to the next call + to readline. + Notice also that this routine reads up to '\n' and overwrites + it with '\0'. Thus if the line is really terminated with + "\r\n", the '\r' will remain unchanged. + */ + char *bufx = bufptr; + static char *bp; + static int cnt = 0; + static char b[ 4096 ]; + char c; + + while ( --len > 0 ) + { + if ( --cnt <= 0 ) + { + cnt = recv( fd, b, sizeof( b ), 0 ); + if ( cnt < 0 ) + { + if ( errno == EINTR ) + { + len++; /* the while will decrement */ + continue; + } + return -1; + } + if ( cnt == 0 ) + return 0; + bp = b; + } + c = *bp++; + *bufptr++ = c; + if ( c == '\n' ) + { + *bufptr = '\0'; + return bufptr - bufx; + } + } + return -1; +} + +int readData(int socket, uint8_t *buf, int buflen) +{ + int sendSize = buflen; + int status; + + while (buflen) + { +#ifndef __MACH__ + status = read(socket, buf, buflen); +#else + status = recv(socket, buf, buflen, 0); +#endif + if (status <= 0) + { + return status; + } + buflen -= status; + buf += status; + } + + return sendSize; +} + +int readHdr(int socket, FlowControlProtocolHeader *hdr) +{ + uint8_t buffer[sizeof(FlowControlProtocolHeader)]; + + uint8_t *data = buffer; + + int status = readData(socket, buffer, sizeof(FlowControlProtocolHeader)); + if (status <= 0) + return status; + + uint32_t value; + data = decode(data, value); + hdr->msgType = value; + + data = decode(data, value); + hdr->seqNumber = value; + + data = decode(data, value); + hdr->status = value; + + data = decode(data, value); + hdr->payloadLen = value; + + return sizeof(FlowControlProtocolHeader); +} + +int readXML(char **xmlContent) +{ + std::ifstream is ("conf/flowServer.xml", std::ifstream::binary); + if (is) { + // get length of file: + is.seekg (0, is.end); + int length = is.tellg(); + is.seekg (0, is.beg); + + char * buffer = new char [length]; + + printf("Reading %s len %d\n", "conf/flowServer.xml", length); + // read data as a block: + is.read (buffer,length); + + is.close(); + + // ...buffer contains the entire file... + *xmlContent = buffer; + + return length; + } + return 0; +} + +static int sockfd = 0, newsockfd = 0; +void sigHandler(int signal) +{ + if (signal == SIGINT || signal == SIGTERM) + { + close(newsockfd); + close(sockfd); + exit(1); + } +} + +int main(int argc, char *argv[]) +{ + int portno; + socklen_t clilen; + struct sockaddr_in serv_addr, cli_addr; + char buffer[4096]; + int flag = 0; + int number = 0; + + int n; + if (argc < 2) { + fprintf(stderr,"ERROR, no port provided\n"); + exit(1); + } + + if (signal(SIGINT, sigHandler) == SIG_ERR || signal(SIGTERM, sigHandler) == SIG_ERR) + { + + return -1; + } + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) + error("ERROR opening socket"); + bzero((char *) &serv_addr, sizeof(serv_addr)); + portno = atoi(argv[1]); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = INADDR_ANY; + serv_addr.sin_port = htons(portno); + if (bind(sockfd, (struct sockaddr *) &serv_addr, + sizeof(serv_addr)) < 0) + error("ERROR on binding"); + listen(sockfd,5); + if (portno == DEFAULT_NIFI_SERVER_PORT) + { + while (true) + { + clilen = sizeof(cli_addr); + newsockfd = accept(sockfd, + (struct sockaddr *) &cli_addr, + &clilen); + if (newsockfd < 0) + { + error("ERROR on accept"); + break; + } + // process request + FlowControlProtocolHeader hdr; + int status = readHdr(newsockfd, &hdr); + if (status > 0) + { + printf("Flow Control Protocol receive MsgType %s\n", FlowControlMsgTypeToStr((FlowControlMsgType) hdr.msgType)); + printf("Flow Control Protocol receive Seq Num %d\n", hdr.seqNumber); + printf("Flow Control Protocol receive Resp Code %s\n", FlowControlRespCodeToStr((FlowControlRespCode) hdr.status)); + printf("Flow Control Protocol receive Payload len %d\n", hdr.payloadLen); + if (((FlowControlMsgType) hdr.msgType) == REGISTER_REQ) + { + printf("Flow Control Protocol Register Req receive\n"); + uint8_t *payload = new uint8_t[hdr.payloadLen]; + uint8_t *payloadPtr = payload; + status = readData(newsockfd, payload, hdr.payloadLen); + while (status > 0 && payloadPtr < (payload + hdr.payloadLen)) + { + uint32_t msgID = 0xFFFFFFFF; + payloadPtr = decode(payloadPtr, msgID); + if (((FlowControlMsgID) msgID) == FLOW_SERIAL_NUMBER) + { + // Fixed 8 bytes + uint8_t seqNum[8]; + memcpy(seqNum, payloadPtr, 8); + printf("Flow Control Protocol Register Req receive serial num\n"); + payloadPtr += 8; + } + else if (((FlowControlMsgID) msgID) == FLOW_XML_NAME) + { + uint32_t len; + payloadPtr = decode(payloadPtr, len); + printf("Flow Control Protocol receive XML name length %d\n", len); + std::string flowName = (const char *) payloadPtr; + payloadPtr += len; + printf("Flow Control Protocol receive XML name %s\n", flowName.c_str()); + } + else + { + break; + } + } + delete[] payload; + // Send Register Respond + // Calculate the total payload msg size + char *xmlContent; + uint32_t xmlLen = readXML(&xmlContent); + uint32_t payloadSize = FlowControlMsgIDEncodingLen(REPORT_INTERVAL, 0); + if (xmlLen > 0) + payloadSize += FlowControlMsgIDEncodingLen(FLOW_XML_CONTENT, xmlLen); + + uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; + uint8_t *data = new uint8_t[size]; + uint8_t *start = data; + + // encode the HDR + hdr.msgType = REGISTER_RESP; + hdr.payloadLen = payloadSize; + hdr.status = RESP_SUCCESS; + data = encode(data, hdr.msgType); + data = encode(data, hdr.seqNumber); + data = encode(data, hdr.status); + data = encode(data, hdr.payloadLen); + + // encode the report interval + data = encode(data, REPORT_INTERVAL); + data = encode(data, DEFAULT_REPORT_INTERVAL); + + // encode the XML content + if (xmlLen > 0) + { + data = encode(data, FLOW_XML_CONTENT); + data = encode(data, xmlLen); + data = encode(data, (uint8_t *) xmlContent, xmlLen); + delete[] xmlContent; + } + + // send it + status = sendData(newsockfd, start, size); + delete[] start; + } + else if (((FlowControlMsgType) hdr.msgType) == REPORT_REQ) + { + printf("Flow Control Protocol Report Req receive\n"); + uint8_t *payload = new uint8_t[hdr.payloadLen]; + uint8_t *payloadPtr = payload; + status = readData(newsockfd, payload, hdr.payloadLen); + while (status > 0 && payloadPtr < (payload + hdr.payloadLen)) + { + uint32_t msgID = 0xFFFFFFFF; + payloadPtr = decode(payloadPtr, msgID); + if (((FlowControlMsgID) msgID) == FLOW_XML_NAME) + { + uint32_t len; + payloadPtr = decode(payloadPtr, len); + printf("Flow Control Protocol receive XML name length %d\n", len); + std::string flowName = (const char *) payloadPtr; + payloadPtr += len; + printf("Flow Control Protocol receive XML name %s\n", flowName.c_str()); + } + else + { + break; + } + } + delete[] payload; + // Send Register Respond + // Calculate the total payload msg size + std::string processor = "RealTimeDataCollector"; + std::string propertyName1 = "real Time Message ID"; + std::string propertyValue1 = "41"; + std::string propertyName2 = "Batch Message ID"; + std::string propertyValue2 = "172,30,48"; + if (flag == 0) + { + propertyName1 = "Real Time Message ID"; + propertyValue1 = "41"; + propertyName2 = "Batch Message ID"; + propertyValue2 = "172,48"; + flag = 1; + } + else if (flag == 1) + { + propertyName1 = "Real Time Message ID"; + propertyValue1 = "172,48"; + propertyName2 = "Batch Message ID"; + propertyValue2 = "41"; + flag = 0; + } + uint32_t payloadSize = FlowControlMsgIDEncodingLen(PROCESSOR_NAME, processor.size()+1); + payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName1.size()+1); + payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue1.size()+1); + payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_NAME, propertyName2.size()+1); + payloadSize += FlowControlMsgIDEncodingLen(PROPERTY_VALUE, propertyValue2.size()+1); + + uint32_t size = sizeof(FlowControlProtocolHeader) + payloadSize; + uint8_t *data = new uint8_t[size]; + uint8_t *start = data; + + // encode the HDR + hdr.msgType = REPORT_RESP; + hdr.payloadLen = payloadSize; + hdr.status = RESP_SUCCESS; + + if (number >= 10 && number < 20) + { + // After 10 second report, stop the flow controller for 10 second + hdr.status = RESP_STOP_FLOW_CONTROLLER; + } + else if (number == 20) + { + // restart the flow controller after 10 second + hdr.status = RESP_START_FLOW_CONTROLLER; + } + else if (number == 30) + { + // retrigger register + hdr.status = RESP_TRIGGER_REGISTER; + number = 0; + } + + number++; + + data = encode(data, hdr.msgType); + data = encode(data, hdr.seqNumber); + data = encode(data, hdr.status); + data = encode(data, hdr.payloadLen); + + // encode the processorName + data = encode(data, PROCESSOR_NAME); + data = encode(data, processor); + + // encode the propertyName and value TLV + data = encode(data, PROPERTY_NAME); + data = encode(data, propertyName1); + data = encode(data, PROPERTY_VALUE); + data = encode(data, propertyValue1); + data = encode(data, PROPERTY_NAME); + data = encode(data, propertyName2); + data = encode(data, PROPERTY_VALUE); + data = encode(data, propertyValue2); + // send it + status = sendData(newsockfd, start, size); + delete[] start; + } + } + close(newsockfd); + } + close(sockfd); + } + else + { + clilen = sizeof(cli_addr); + newsockfd = accept(sockfd, + (struct sockaddr *) &cli_addr, + &clilen); + if (newsockfd < 0) + error("ERROR on accept"); + while (1) + { + bzero(buffer,4096); + n = readline(newsockfd,buffer,4095); + if (n <= 0 ) + { + close(newsockfd); + newsockfd = accept(sockfd, + (struct sockaddr *) &cli_addr, + &clilen); + continue; + } + printf("%s",buffer); + } + close(newsockfd); + close(sockfd); + } + return 0; +} http://git-wip-us.apache.org/repos/asf/nifi-minifi-cpp/blob/7956696e/thirdparty/libxml2/AUTHORS ---------------------------------------------------------------------- diff --git a/thirdparty/libxml2/AUTHORS b/thirdparty/libxml2/AUTHORS new file mode 100644 index 0000000..cf2e9a6 --- /dev/null +++ b/thirdparty/libxml2/AUTHORS @@ -0,0 +1,5 @@ +Daniel Veillard <dan...@veillard.com> +Bjorn Reese <bre...@users.sourceforge.net> +William Brack <wbr...@mmm.com.hk> +Igor Zlatkovic <i...@zlatkovic.com> for the Windows port +Aleksey Sanin <alek...@aleksey.com>